官方网站建设的目标,自己搭建服务器做网站要多久,wordpress 添加水印,长沙市网站制作哪家专业前言
RocketMQ是阿里开发的一个高性能的消息队列#xff0c;支持各种消息类型#xff0c;而且支持事务消息#xff0c;可以说是现在的很多系统中的香饽饽了#xff0c;所以呢#xff0c;怎么使用大家肯定是要学习的
我们作为一个有梦想的程序员#xff0c;在学习一门技…
前言
RocketMQ是阿里开发的一个高性能的消息队列支持各种消息类型而且支持事务消息可以说是现在的很多系统中的香饽饽了所以呢怎么使用大家肯定是要学习的
我们作为一个有梦想的程序员在学习一门技术的时候肯定是不能光知其然这是远远不够的我们必须要知其所以然这样才能在面试的时候侃侃而谈啊呸不对这样我们才能在工作中遇到问题的时候理性的去思考如何解决问题
我们知道RocketMQ的架构是producer、NameServer、broker、Consumerproducer是生产消息的NameServer是路由中心负责服务的注册发现以及路由管理这些。
Consumer是属于消费消息的broker则属于真正的存储消息以及进行消息的持久化也就是存储消息的文件和索引消息的文件都在broker上
消息队列的主要作用是解耦异步削峰也就意味着消息队列中的存储功能是必不可少的而随着时代的发展业务量的增加也对消息队列的存储功能的强度的要求越来越高了
也就是说你不能光性能好你得存储的消息也得足够支撑我的业务量你只能存储100MB的消息我这系统每分钟的消息业务量可能500MB了那肯定不够使啊那还削个啥的峰啊峰来了你自己都顶不住 RocketMQ凭借其强大的存储能力和强大的消息索引能力以及各种类型消息和消息的特性脱颖而出于是乎我们这些有梦想的程序员学习RocketMQ的存储原理也变得尤为重要
而要说起这个存储原理则不得不说的就是RocketMQ的消息存储文件commitLog文件消费方则是凭借着巧妙的设计Consumerqueue文件来进行高性能并且不混乱的消费还有RocketMQ的强大的支持消息索引的特性靠的就是indexfile索引文件
我们这篇文章就从这commitLog、Consumerqueue、indexfile这三个神秘的文件说起搞懂这三个文件RocketMQ的核心就被你掏空了
先上个图写入commitLog文件时commitLog和Consumerqueue、indexfile文件三者的关系 Commitlog文件
大小和命名规则
RocketMQ中的消息存储文件放在${ROCKET_HOME}/store 目录下当生产者发送消息时broker会将消息存储到Commit文件夹下文件夹下面会有一个commitLog文件但是并不是意味着这个文件叫这个文件命名是根据消息的偏移量来决定的 文件有自己的生成规则每个commitLog文件的大小是1G一般情况下第一个 CommitLog 的起始偏移量为 0第二个 CommitLog 的起始偏移量为 1073741824 1G 1073741824byte。
也正是因为该文件的文件名字规则所以也可以更好的知道消息处于哪个文件中假设物理偏移量是1073741830则相对的偏移量是66 1073741830 - 1073741824于是判断出该消息位于第二个commitLog文件上下面要说的Consumerqueue文件和indexfile文件都是通过偏移量来计算出消息位于哪个文件进行更为精准的定位减少了IO次数
文件存储规则和特点
commitLog文件的最大的一个特点就是消息的顺序写入随机读写关于commitLog的文件的落盘有两种一种是同步刷盘一种是异步刷盘可通过 flushDiskType 进行配置
在写入commitLog的时候内部会有一个mappedFile内存映射文件消息是先写入到这个内存映射文件中然后根据刷盘策略写到硬盘中对于producer的角度来说就是同步就是当消息真正的写到硬盘的时候才会给producer返回成功而异步就是当消息到达内存的时候就返回成功了然后异步的去刷盘
跑题了最大的特点顺序写入所有的topic的消息都存储到commitLog文件中顺序写入可以充分的利用磁盘顺序减少了IO争用数据存储的性能kafka也是通过硬盘顺序存盘的
大家都常说硬盘的速度比内存慢其实这句话也是有歧义的当硬盘顺序写入和读取的时候速度不比内存慢甚至比内存速度快这种存储方式就好比数组我们如果知道数组的下标则可以直接通过下标计算出位置找到内存地址众所周知数组的读取是很快的但是数组的缺点在于插入数据比较慢因为如果在中间插入数据需要将后面的数据往后移动
而对于数组来说如果我们只会顺序的往后添加数组的速度也是很快的因为数组没有后续的数据的移动这一操作很耗时
回到RocketMQ中的commitLog文件也是同样的道理顺序的写入文件也就不需要太多的去考虑写入的位置直接找到文件往后放就可以了而取数据的时候也是和数组一样我们可以通过文件的大小去精准的定位到哪一个文件然后再精准的定位到文件的位置 当然至于这个索引位置就是靠下面的Consumerqueue文件和indexfile文件来找到消息的位置的也就是索引地址
哦对了数组的元素大小是一样的并不意味这commitLog文件的各个消息存储空间一样
简单看下源码
这部分源码在DefaultMessageStore.putMessage
Overridepublic PutMessageResult putMessage(MessageExtBrokerInner msg) {if (this.shutdown) {log.warn(message store has shutdown, so putMessage is forbidden);return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// 从节点不允许写入if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is slave mode, so putMessage is forbidden );}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);}// store是否允许写入if (!this.runningFlags.isWriteable()) {long value this.printTimes.getAndIncrement();if ((value % 50000) 0) {log.warn(message store is not writeable, so putMessage is forbidden this.runningFlags.getFlagBits());}return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);} else {this.printTimes.set(0);}// topic过长if (msg.getTopic().length() Byte.MAX_VALUE) {log.warn(putMessage message topic length too long msg.getTopic().length());return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);}// 消息附加属性过长if (msg.getPropertiesString() ! null msg.getPropertiesString().length() Short.MAX_VALUE) {log.warn(putMessage message properties length too long msg.getPropertiesString().length());return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);}if (this.isOSPageCacheBusy()) {return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);}long beginTime this.getSystemClock().now();// 添加消息到commitLogPutMessageResult result this.commitLog.putMessage(msg);long eclipseTime this.getSystemClock().now() - beginTime;if (eclipseTime 500) {log.warn(putMessage not in lock eclipse time(ms){}, bodyLength{}, eclipseTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);if (null result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}return result;}中间的commitLog.putMessage就是负责实现消息写入commitLog文件这个太长了我就不给大家截了
大致流程就是组装消息放入属性然后通过MappedFile对象写入文件紧接着根据刷盘策略刷盘最后进行主从同步
consumerQueue文件
RocketMQ是分为多个topic消息所属主题属于消息类型每一个topic有多个queue每个queue放着不同的消息在同一个消费者组下的消费者可以同时消费同一个topic下的不同queue队列的消息。
不同消费者下的消费者可以同时消费同一个topic下的相同的队列的消息。而同一个消费者组下的消费者不可以同时消费不同topic下的消息
而每个topic下的queue队列都会对应一个Consumerqueue文件例如Topic中有三个队列每个队列中的消息索引都会有一个编号编号从0开始往上递增。
并由此一个位点offset的概念有了这个概念就可以对Consumer端的消费情况进行队列定义。
消息消费完成后需要将消费进度存储起来即前面提到的offset。
广播模式下同消费组的消费者相互独立消费进度要单独存储
集群模式下同一条消息只会被同一个消费组消费一次消费进度会参与到负载均衡中故消费进度是需要共享的。
消费进度也就是由Broker管理每一个消费者消费Topic的进度包含正常提交消费进度和重置消费进度消费进度管理的目的是保证消费者在正常运行状态、重启、异常关闭等状态下都能准确续接“上一次”未处理的消息。
在RocketMQ中实现的消费语义叫“至少投递一次”也就是所有的消息至少有一次机会消费不用担心会丢消息。用户需要实现消费幂等来避免重复投递对业务实际数据的影响。
幂等是啥应该不用我多说了吧亲爱的你们肯定知道了 如上图所示消费者一般在两种情况下“上报”消费进度消费成功后(包含正常消费成功、重试消费成功)和重置消费进度。
而消费进度的标准就是Consumerqueue文件这个文件中存储的是投递到某一个messagequeue中的位置信息
比如我们知道消息存储到commitLog文件中一个消费者A对应着消费messagequeueA这个队列但是无法确定在commitLog文件中该队列中的消息的位置于是就有了ConsumerqueueA这个文件这个文件对应一个messagequeueA消费者A便可以通过ConsumerqueueA来确定自己的消费进度获取消息在commitLog文件中的具体的offset和大小
存放位置和结构
consumequeue存放在store文件里面里面的consumequeue文件里面按照topic排放然后每个topic默认4个队列里面存放的consumequeue文件
ConsumeQueue中并不需要存储消息的内容而存储的是消息在CommitLog中的offset。也就是说ConsumeQueue其实是CommitLog的一个索引文件。
consumequeue是定长结构每个记录固定大小20个字节单个consumequeue文件默认包含30w个条目所以单个文件大小大概6M左右 很显然Consumer消费消息的时候要读2次先读ConsumeQueue得到offset再通过offset找到CommitLog对应的消息内容。
ConsumeQueue的作用
消费者通过broker保存的offsetoffsetTable.offset json文件中保存的ConsumerQueue的下标可以在ConsumeQueue中获取消息从而快速的定位到commitLog的消息位置由于每个消息的大小是不一样的也可以通过size获取到消息的大小从而读取完整的消息
过滤tag是也是通过遍历ConsumeQueue来实现的先比较hash(tag)符合条件的再到具体消息比较tag
offsetTable.offset
和commitLog的offset不是一回事这个offset是ConsumeQueue文件的已经消费的下标/行数可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文。这个offset是消息消费进度的核心不同的消费模式保存地址不同
广播模式DefaultMQPushConsumer的BROADCASTING模式各个Consumer没有互相干扰使用LoclaFileOffsetStore把Offset存储在Consumer本地
集群模式DefaultMQPushConsumer的CLUSTERING模式由Broker端存储和控制Offset的值使用RemoteBrokerOffsetStore
简单看下构建过程
在Broker中构建ComsummerQueue不是存储完CommitLog就马上同步构建的而是通过一个线程任务异步的去做这个事情。在DefaultMessageStore中有一个ReputMessageService成员它就是负责构建ComsumerQueue的任务线程。
ReputMessageService继承自ServiceThread表明其是一个服务线程它的run方法很简单如下所示 public void run() {while (!this.isStopped()) {try {Thread.sleep(1);this.doReput(); // 构建ComsumerQueue} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() service has exception. , e);}}}在run方法里每休息1毫秒就进行一次构建ComsumerQueue的动作。因为必须先写入CommitLog然后才能进行ComsumerQueue的构建。那么不排除构建ComsumerQueue的速度太快了而CommitLog还没写入新的消息。这时就需要sleep下让出cpu时间片避免浪费CPU资源。
我们点进去这个doReput()看核心处理逻辑 private void doReput() {for (boolean doNext true; this.isCommitLogAvailable() doNext; ) {SelectMappedBufferResult result DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新写入CommitLog的数据if (result ! null) {try {this.reputFromOffset result.getStartOffset();for (int readSize 0; readSize result.getSize() doNext; ) {DispatchRequest dispatchRequest DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 一条一条的读消息int size dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size 0) {DefaultMessageStore.this.doDispatch(dispatchRequest); // 派发消息进行处理其中就包括构建ComsumerQueuethis.reputFromOffset size;readSize size;} else if (size 0) { // this.reputFromOffset DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize result.getSize();}} else if (!dispatchRequest.isSuccess()) { // 获取消息异常if (size 0) {log.error([BUG]read total count not equals msg total size. reputFromOffset{}, reputFromOffset);this.reputFromOffset size;} else {doNext false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() MixAll.MASTER_ID) {this.reputFromOffset result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext false;}}}我在这里省略了一些和构建ComsumerQueue不相干的代码。
其实在doReput里面就做了三件事
1、获取最新写入到CommitLog中的数据byteBuffer。
2、从byteBuffer中一条条的读取消息并派发出去处理。
3、更新reputFromOffset位移。
感兴趣的可以打断点走一遍
indexFile文件
RocketMQ还支持通过MessageID或者MessageKey来查询消息使用ID查询时因为ID就是用brokeroffset生成的这里msgId指的是服务端的所以很容易就找到对应的commitLog文件来读取消息。
对于用MessageKey来查询消息MessageStore通过构建一个index来提高读取速度
文件结构 indexfile文件存储在store目录下的index文件里面里面存放的是消息的hashcode和index内容文件由一个文件头组成长40字节。500w个hashslot每个4字节。2000w个index条目每个20字节。
所以这里我们可以估算每个indexfile的大小为40500w42000w20个字节大约400M左右
文件详细信息
IndexHeader索引文件头信息由40个字节组成 //8位 该索引文件的第一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
//8位 该索引文件的最后一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
//8位 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息)
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
//8位 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
//4位 该索引文件目前的hash slot的个数
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
//4位 索引文件目前的索引个数
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());Slot槽位默认每个文件配置的slot是500万个每个slot是4位的整型数据Slot每个节点保存当前已经拥有多少个index数据了 //slot的数据存放位置 40 keyHash %500W* 4
int absSlotPos IndexHeader.INDEX_HEADER_SIZE slotPos * hashSlotSize;//Slot Table
//4字节
//记录该slot当前index如果hash冲突即absSlotPos一致作为下一次该slot新增的前置index
this.mappedByteBuffer.putInt(absSlotPos,this.indexHeader.getIndexCount());索引消息内容消息长度固定为20位
//Index Linked list
//topicmessage key的hash值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//消息在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
this.mappedByteBuffer.putLong(absIndexPos 4, phyOffset);
//消息的落盘时间与header里的beginTimestamp的差值(为了节省存储空间如果直接存message的落盘时间就得8bytes)
this.mappedByteBuffer.putInt(absIndexPos 4 8, (int) timeDiff);
//9、记录该slot上一个index
//hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引则prevIndex0, 也是消息索引查找时的停止条件)每个slot位置的第一个消息的prevIndex就是0的
this.mappedByteBuffer.putInt(absIndexPos 4 8 4, slotValue);再论结构
文件结构slot和indexLinkedList可以理解成java中的HashMap
哎你说HashMap我可不困了啊你可别蒙我这个我熟什么负载因子、默认大小、扩容机制、红黑树还有多线程下不安全这些
乖我知道你熟悉你跟着我一起学习这些当然了如指掌只需要你了解HashMap的结构和冲突即可 每放入一个新消息的index进来首先会取MessageKey的HashCode然后用Hashcode对slot的总数进行取模决定该消息key的位置slot的总数默认是500W个
只要取hash就必然面临着hash冲突的问题indexfile也是采用链表结构来解决hash冲突这一点和HashMap一样的不过这个不存在红黑树转换这一说个人猜测这个的冲突数量也达不到很高的级别所以进行这方面的设计也没啥必要甚至变成了强行增加indexfile的文件结构难度
还有在indexfile中的slot中放的是最新的index的指针因为一般查询的时候大概率是优先查询最近的消息
每个slot中放的指针值是索引在indexfile中的偏移量也就是后面index的位置而index中存放的就是该消息在commitlog文件中的offset每个index的大小是20字节所以根据当前索引是这个文件中的第几个偏移量也就很容易定位到索引的位置根据前面的固定大小可以很快把真实坐标算出来以此类推形成一个链表的结构
查询流程
由于indexHeaderslotindex都是固定大小所以
公式1第n个slot在indexFile中的起始位置是这样:40(n-1)*4
公式2第s个index在indexFile中的起始位置是这样:405000000*4(s-1)*20
查询的传入值除了key外还包含一个时间起始值以及截止值
为啥还要传时间范围呢
一个indexFile写完一个会继续写下一个仅仅一个key无法定位到具体的indexFile时间范围就为了更精确的定位到具体的indexFile缩小查找的范围indexFile文件名是一个时间戳根据这个日期就可以定位到传入的日期范围对应在哪个或者哪些indexFile中是不是很棒。
好了我们接着说查询流程
key–计算hash值–hash值对500万取余算出对应的slot序号–根据40(n-1)4(公式1)算出该slot在文件中的位置–读取slot值也就是index序号–根据4050000004(s-1)*20(公式2)算出该index在文件中的位置–读取该index–将key的hash值以及传入的时间范围与index的keyHash值以及timeDiff值进行比对
不满足则根据index中的preIndexNo找到上一个index继续上一步满足则根据index中的phyOffset拿到commitLog中的消息
为啥比对时还要带上时间范围呢
只比key不行吗答案是不行因为key可能会重复producer在消息生产时可以指定消息的key这个key显然无法保证唯一性那自动生成的msgId呢也不能保证唯一你可以去看看msgId的生成规则 包括当前机器IP进程号MessageClientIDSetter.class.getClassLoader()的hashCode值消息生产时间与broker启动时间的差值broker启动后从0开始单调自增的int值前面三项很明显可能重复后面两项一个是时间差一个是重启归零也可能重复 简单看下源码感兴趣的下载源码去研究
indexfile的添加消息索引的过程 public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {//1. 判断该索引文件的索引数小于最大的索引数如果最大索引数IndexService就会尝试新建一个索引文件if (this.indexHeader.getIndexCount() this.indexNum) {//2. 计算该message key的hash值int keyHash indexKeyHashMethod(key);//3. 根据message key的hash值散列到某个hash slot里int slotPos keyHash % this.hashSlotNum;//4. 计算得到该hash slot的实际文件位置Positionint absSlotPos IndexHeader.INDEX_HEADER_SIZE slotPos * hashSlotSize;try {//5. 根据该hash slot的实际文件位置absSlotPos得到slot里的值//这里有两种情况//1). slot0, 当前message的key是该hash值第一个消息索引//2). slot0, 该key hash值上一个消息索引的位置int slotValue this.mappedByteBuffer.getInt(absSlotPos);//6. 数据校验及修正if (slotValue invalidIndex || slotValue this.indexHeader.getIndexCount()) {slotValue invalidIndex;}long timeDiff storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() 0) {timeDiff 0;} else if (timeDiff Integer.MAX_VALUE) {timeDiff Integer.MAX_VALUE;} else if (timeDiff 0) {timeDiff 0;}//7. 计算当前消息索引具体的存储位置(Append模式)int absIndexPos IndexHeader.INDEX_HEADER_SIZE this.hashSlotNum * hashSlotSize this.indexHeader.getIndexCount() * indexSize;//8. 存入该消息索引this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos 4 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos 4 8 4, slotValue);//9. 关键之处在该key hash slot处存入当前消息索引的位置下次通过该key进行搜索时//会找到该key hash slot - slot value - curIndex - //if(curIndex.prevIndex0) pre index (一直循环 直至该curIndex.prevIndex0就停止)this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());if (this.indexHeader.getIndexCount() 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}this.indexHeader.incHashSlotCount();this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error(putKey exception, Key: key KeyHashCode: key.hashCode(), e);} } else {log.warn(Over index file capacity: index count this.indexHeader.getIndexCount() ; index max num this.indexNum);}return false;}indexfile的索引搜索源码
public void selectPhyOffset(final ListLong phyOffsets, final String key, final int maxNum,final long begin, final long end, boolean lock) {if (this.mappedFile.hold()) {//1. 计算该key的hashint keyHash indexKeyHashMethod(key);//2. 计算该hash value 对应的hash slot位置int slotPos keyHash % this.hashSlotNum;//3. 计算该hash value 对应的hash slot物理文件位置int absSlotPos IndexHeader.INDEX_HEADER_SIZE slotPos * hashSlotSize;FileLock fileLock null;try {//4. 取出该hash slot 的值int slotValue this.mappedByteBuffer.getInt(absSlotPos);//5. 该slot value 0 就代表没有该key对应的消息索引,直接结束搜索// 该slot value maxIndexCount 就代表该key对应的消息索引超过最大限制数据有误,直接结束搜索if (slotValue invalidIndex || slotValue this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() 1) {} else {//6. 从当前slot value 开始搜索for (int nextIndexToRead slotValue; ; ) {if (phyOffsets.size() maxNum) {break;}//7. 找到当前slot value(也就是index count)物理文件位置int absIndexPos IndexHeader.INDEX_HEADER_SIZE this.hashSlotNum * hashSlotSize nextIndexToRead * indexSize;//8. 读取消息索引数据int keyHashRead this.mappedByteBuffer.getInt(absIndexPos);long phyOffsetRead this.mappedByteBuffer.getLong(absIndexPos 4);long timeDiff (long) this.mappedByteBuffer.getInt(absIndexPos 4 8);//9. 获取该消息索引的上一个消息索引index(可以看成链表的prev 指向上一个链节点的引用)int prevIndexRead this.mappedByteBuffer.getInt(absIndexPos 4 8 4);//10. 数据校验if (timeDiff 0) {break;}timeDiff * 1000L;long timeRead this.indexHeader.getBeginTimestamp() timeDiff;boolean timeMatched (timeRead begin) (timeRead end);//10. 数据校验比对 hash值和落盘时间if (keyHash keyHashRead timeMatched) {phyOffsets.add(phyOffsetRead);}//当prevIndex 0 或prevIndex maxIndexCount 或prevIndexRead nextIndexToRead 或 timeRead begin 停止搜索if (prevIndexRead invalidIndex|| prevIndexRead this.indexHeader.getIndexCount()|| prevIndexRead nextIndexToRead || timeRead begin) {break;}nextIndexToRead prevIndexRead;}}} catch (Exception e) {log.error(selectPhyOffset exception , e);} finally {this.mappedFile.release();}}}结束语
感谢大家能够做我最初的读者和传播者请大家相信只要你给我一
份爱我终究会还你们一页情的。
欢迎大家关注我的公众号【左耳君】探索技术分享生活
哦对了后续所有的文章都会更新到这里
https://github.com/DayuMM2021/Java