网站开发建设准备工作,做网站需要关注哪些重要问题,帮推广平台,广州白云建方舱医院文章目录 一、创建核心类1, 交换机2, 交换机类型3, 队列4, 绑定5, 交换机转发 绑定规则6, 消息7, 消息属性 二、数据库设计1, 使用 SQLite2, 使用 MyBatis2.1, 创建 Interface2.2, 创建 xml 文件 三、硬盘管理 -- 数据库1, 创建 DataBaseManager 类2, init() 初始化数据库… 文章目录 一、创建核心类1, 交换机2, 交换机类型3, 队列4, 绑定5, 交换机转发 绑定规则6, 消息7, 消息属性 二、数据库设计1, 使用 SQLite2, 使用 MyBatis2.1, 创建 Interface2.2, 创建 xml 文件 三、硬盘管理 -- 数据库1, 创建 DataBaseManager 类2, init() 初始化数据库3, insertDefaultData() 插入默认数据4, createTable() 创建数据表5, isDBExists() 数据库是否存在6, deleteTables() 删除数据表7, 封装数据库的增删查操作 四、小结 创建 Spring Boot 项目, Spring Boot 2 系列版本, Java 8 , 引入 MyBatis, Lombok 依赖 提示是正在努力进步的小菜鸟一只如有大佬发现文章欠佳之处欢迎批评指点~ 废话不多说直接上干货
整体目录结构 :
本文主要实现 server 包 一、创建核心类
上篇文章 分析了项目需求, 介绍了项目中重要的核心概念和核心 API, 以及重要板块
一个消息队列中需要的交换机, 队列, 绑定, 消息等核心概念, 以面向对象的思想, 在server.core 包下创建出来对应的类 1, 交换机
Data
public class Exchange {// 身份标识(唯一, RabbitMQ 就是以 name 作为身份标识的)private String name;// 三种交换机类型private ExchangeTypeEnum type ExchangeTypeEnum.DIRECT;// 是否需要持久化存储private boolean durable false;// 是否(交换机没人使用时)自动删除 ------------------先不实现private boolean autoDelete false;// 创建交换机时, 指定的参数选项 ------------------先不实现// 数据库中存储 String 类型, 需要序列化private MapString, Object arguments new HashMap();/*** 实现序列化, 修改 getter()和 setter(), 供数据库使用*/public String getArguments(){ObjectMapper objectMapper new ObjectMapper();// 序列化 往数据库里写try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return {};}public void setArguments(String arguments) {ObjectMapper objectMapper new ObjectMapper();// 反序列化 从数据库里读try {this.arguments objectMapper.readValue(arguments, new TypeReferenceHashMapString, Object() {});} catch (JsonProcessingException e) {e.printStackTrace();}}/*** 便于 测试/ 代码内部调用 时使用*/public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
}本项目未实现 autoDelete 和 arguments 2, 交换机类型
这是一个枚举类, 包含直接交换机, 扇出交换机, 主题交换机
public enum ExchangeTypeEnum {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;ExchangeTypeEnum(int type) {this.type type;}public int getType() {return type;}
}3, 队列
类名不设为 Queue, 防止和标准库中的 Queue 冲突
Data
public class MessageQueue {// 唯一标识private String name;// 是否需要持久化存储private boolean durable false;// 是否为独有(如果是独有, 只能被一个消费者使用) ------------------先不实现private boolean exclusive false;// 是否(队列没人使用时)自动删除 ------------------先不实现private boolean autoDelete false;// 创建队列时, 指定的参数选项 ------------------先不实现// 数据库中存储 String 类型, 需要序列化private MapString, Object arguments new HashMap();/*** 实现序列化, 修改 getter()和 setter(), 供数据库使用*/public String getArguments(){ObjectMapper objectMapper new ObjectMapper();// 序列化 往数据库里写try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return {};}public void setArguments(String arguments) {ObjectMapper objectMapper new ObjectMapper();// 反序列化 从数据库里读try {this.arguments objectMapper.readValue(arguments, new TypeReferenceHashMapString, Object() {});} catch (JsonProcessingException e) {e.printStackTrace();}}/*** 便于 测试/ 代码内部调用 时使用*/public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(MapString, Object arguments) {this.arguments arguments;}
} 暂不实现 exclusive, autoDelete, arguments 4, 绑定
Data
public class Binding {// 绑定的消息队列标识private String queueName;// 绑定的交换机标识private String exchangeName;// 绑定的 keyprivate String bindingKey;
}bindingKey 是在创建交换机和队列的绑定时指定的, 生产者发布消息时, 需额外指定一个 routingKey 如果是直接交换机, routingKey 作为队列的唯一标识 如果是扇出交换机, routingKey 为 null, 无需使用 如果是主题交换机, routingKey 需和 bindingKey 匹配 5, 交换机转发 绑定规则
在此先不展示, 在后续文章中对应的部分再展示 防止思路混淆 6, 消息
Data
public class Message implements Serializable {// 属性private BasicProperties basicProperties new BasicProperties();// 正文private byte[] body;// 消息存储在文件中的偏移量(字节, 约定 [,) 区间 )private transient long offsetBegin 0;private transient long offsetEnd 0;// 是否合法(逻辑删除的标记, 0x1 有效, 0x0 无效)private byte isValid 0x1;// 提供工厂方法, 封装 Message 类的创建过程public static Message createMessage(String routingKey, BasicProperties basicProperties, byte[] body) {Message message new Message();if (basicProperties ! null) {message.setBasicProperties(basicProperties);}message.setMessageId(M$ UUID.randomUUID().toString().replace(-, ));message.setRoutingKey(routingKey);message.setBody(body);return message;}// 下面这些方法是为了封装 basicProperties 中的 getter()和 setter()public String getMessageId() {return basicProperties.getMessageId();}public void setMessageId(String id) {basicProperties.setMessageId(id);}public String getRoutingKey() {return basicProperties.getRoutingKey();}public void setRoutingKey(String key) {basicProperties.setRoutingKey(key);}public int getDeliverMode() {return basicProperties.getDeliverMode();}public void setDeliverMode(int value) {basicProperties.setDeliverMode(value);}
}Message 需要实现 Serializable 接⼝. 后续需要把 Message 写⼊⽂件以及进⾏⽹络传输.basicProperties 是消息的属性信息. body 是消息体.offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置. 这⼀块具体的设计后⾯再详细介绍. 使⽤ transient 关键字避免属性被序列化.isValid ⽤来表⽰消息在⽂件中是否有效. 这⼀块具体的设计后⾯再详细介绍.createMessage() 相当于⼀个⼯⼚⽅法, ⽤来创建⼀个 Message 实例. messageId 通过UUID 的⽅式⽣成. 文件中的数据不相当于一个顺序表, 如果要真正删除一条消息, 是不是需要把后面的数据整体往前挪动? 这无疑是个低效操作, 因此, 对于 “删除消息” 这种高频操作, 逻辑删除显然是更优解, 但也不能让消息无限制的堆在文件中, 所以后面会参考 JVM 的 GC , 自主实现清理文件的功能 7, 消息属性
这个类作为Message 类的 引用类型的成员属性, 也需要实现 Serializable 接⼝, 否则 message 对象不能被序列化
Data
public class BasicProperties implements Serializable {// 消息的唯一标识(UUID)private String messageId;// 和 bindingKey 匹配(如果交换机为 DIRECT, 该值就是队列名, 如果交换机为 FANOUT, 该值为 null )private String routingKey;// 是否要消息持久化( RabbitMQ 就是使用 1 表示不持久化, 2 表示持久化)private int deliverMode 2;// ... 其他属性暂不考虑
}二、数据库设计 1, 使用 SQLite
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件. MySQL 是一个客户端服务器结构的程序, SQLite 相当于直接操作本地的硬盘文件 在pom.xml文件中的 “dependencies” 标签中拷贝 : dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.41.0.1/version/dependency在 resource 目录下创建 application.yml 文件配置SQLite数据源, 拷贝:
spring:datasource:url: jdbc:sqlite:./data/meta.db # 注意这个路径username:# 不需要password:# 不需要driver-class-name: org.sqlite.JDBCmybatis:mapper-locations: classpath:mapper/**Mapper.xml数据库文件的位置就是 ./data/meta.db, 数据库的数据就在这里 2, 使用 MyBatis
实现 mapper 包 2.1, 创建 Interface
在 server.mapper 包下定义一个 MetaMapper 接口, 需要提供 交换机, 队列, 绑定 的建表, 插入, 删除 , 查询的 API (抽象方法) 不需要使用sql 语句建库, 创建出 ./data/meta.db 这个文件就相当于建库了, 后面再写创建文件的操作 Mapper
public interface MetaMapper {/*** 建表*/void createExchangeTable();void createQueueTable();void createBindingTable();/*** exchange 表*/void insertExchange(Exchange exchange);void deleteExchange(String exchangeName);ListExchange selectAllExchanges();/*** queue 表*/void insertQueue(MessageQueue queue);void deleteQueue(String queueName);ListMessageQueue selectAllQueues();/*** binding 表*/void insertBinding(Binding binding);void deleteBinding(Binding binding);ListBinding selectAllBindings();
}2.2, 创建 xml 文件
在 resource 目录下, 新建一个 mapper 包, 创建 MetaMapper.xml 文件, 在这个文件中编写 sql 语句, 实现上述在 MetaMapper 接口中的抽象方法
在 xml 文件中拷贝:
?xml version1.0 encodingUTF-8?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.example.demo.server.mapper.MetaMapper/mapper其中 namespace 这个字段的值要对应刚才定义的 MetaMapper 接口 的路径 建表(使用 update 标签即可) update idcreateExchangeTablecreate table if not exists exchange (name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));/updateupdate idcreateQueueTablecreate table if not exists queue (name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));/updateupdate idcreateBindingTablecreate table if not exists binding (exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));/updateexchange 表的增删查的 sql insert idinsertExchange parameterTypecom.example.demo.server.core.Exchangeinsert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});/insertselect idselectAllExchanges resultTypecom.example.demo.server.core.Exchangeselect * from exchange;/selectdelete iddeleteExchange parameterTypejava.lang.Stringdelete from exchange where name #{exchangeName};/deletequeue 表的增删查的 sql
insert idinsertQueue parameterTypecom.example.demo.server.core.MessageQueueinsert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});/insertselect idselectAllQueues resultTypecom.example.demo.server.core.MessageQueueselect * from queue;/selectdelete iddeleteQueue parameterTypejava.lang.Stringdelete from queue where name #{queueName};/deletequeue 表的增删查的 sql insert idinsertBinding parameterTypecom.example.demo.server.core.Bindinginsert into binding values(#{exchangeName}, #{queueName}, #{bindingKey});/insertselect idselectAllBindings resultTypecom.example.demo.server.core.Bindingselect * from binding;/selectdelete iddeleteBinding parameterTypecom.example.demo.server.core.Bindingdelete from binding where exchangeName #{exchangeName} and queueName #{queueName};/delete三、硬盘管理 – 数据库
实现 datacenter 包中的 DataBaseManager 类 datacenter 这个包中整合硬盘上的数据管理 内存上的数据管理 硬盘上的数据管理又整合了 数据库中的数据管理 文件中的数据管理 1, 创建 DataBaseManager 类
成员属性需要 MetaMapper 的对象, 用来封装刚才编写的数据库的 API
但并不使用 SpringBoot 的依赖注入 (AutoWired), 而是使用以来查找的方式获取到 metaMapper
public class DataBaseManager {private MetaMapper metaMapper;}在启动类中初始化容器
SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context SpringApplication.run(DemoApplication.class, args);}
}2, init() 初始化数据库
对于 DataBaseManager 类的初始化工作, 不仅仅是对成员属性的初始化, 而是需要一些额外的业务逻辑, 这种情况就不使用构造方法了, 而是单独定义一个方法
初始化工作: metaMapper 建库建表 插入默认数据 如果是第一次启动服务器, 没有数据库则建库建表 如果是重启服务器, 已有数据库则不做处理 MyBatis 在第一次创建数据表的时候就会创建出 ./data/meta.db 这个文件, 但前提是要有 ./data 这个目录, 所以要先手动创建 public void init() {this.metaMapper DemoApplication.context.getBean(MetaMapper.class);if(!isDBExists()) {// 创建目录File file new File(./data);file.mkdirs();// 创建数据表createTable();// 插入数据insertDefaultData();}}3, insertDefaultData() 插入默认数据
创建一个默认的交换机(直接交换机) public void insertDefaultData() {Exchange exchange new Exchange();exchange.setName();exchange.setType(ExchangeTypeEnum.DIRECT);exchange.setDurable(false);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);}4, createTable() 创建数据表 public void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();}5, isDBExists() 数据库是否存在
SQLite 是一个轻量级数据库, 操作 SQLite 相当于操作本地的硬盘文件, 所以检查数据库是否存在就是检查数据库文件是否存在 public boolean isDBExists() {File file new File(./data/meta.db);return file.exists();}6, deleteTables() 删除数据表
同上, 删除数据库就是删除文件, 先删文件再删目录 public void deleteTables(){File file new File(./data/meta.db);file.delete();File dir new File(./data);dir.delete();}7, 封装数据库的增删查操作
public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public ListExchange selectAllExchanges() {return metaMapper.selectAllExchanges();}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MessageQueue queue) {metaMapper.insertQueue(queue);}public ListMessageQueue selectAllQueues() {return metaMapper.selectAllQueues();}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public ListBinding selectAllBindings() {return metaMapper.selectAllBindings();}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}四、小结
本文主要实现了两点 :
1, 根据面向对象思想, 创建出了交换机, 队列, 绑定, 消息, 等核心概念的类2, 持久化存储 -- 硬盘管理 -- 数据库 2.1, 数据库设计, 使用 SQLite, 并结合 MyBatis 编写了交换机, 队列, 绑定的建表, 增, 删, 查的 sql 2.2, 使用 DataBaseManager 这个类管理数据库中的数据, 因为仅仅有sql语句不足以支撑所有的业务逻辑, 还需要对数据库的初始化, 判断存在, 删除等做进一步的封装
篇幅有限, 目前为止, 持久化存储 -- 硬盘管理 -- 文件 这个板块还没实现
下篇会实现消息在文件上的存储( 文件管理 : MessageFileManager 类)