当前位置: 首页 > news >正文

响应式网站的服务麦德龙网站建设目标

响应式网站的服务,麦德龙网站建设目标,企业手机建站系统,手机端竞价恶意点击#x1f34a; Java学习#xff1a;Java从入门到精通总结 #x1f34a; 深入浅出RocketMQ设计思想#xff1a;深入浅出RocketMQ设计思想 #x1f34a; 绝对不一样的职场干货#xff1a;大厂最佳实践经验指南 #x1f4c6; 最近更新#xff1a;2023年3月4日 #x1… Java学习Java从入门到精通总结 深入浅出RocketMQ设计思想深入浅出RocketMQ设计思想 绝对不一样的职场干货大厂最佳实践经验指南 最近更新2023年3月4日 个人简介通信工程本硕 for NJU、Java程序员。做过科研paper发过专利优秀的程序员不应该只是CRUD 点赞 收藏 ⭐留言 都是我最大的动力 文章目录DefaultMessageStoreCommitLogMappedFileQueueMappedFilesubmitFlushRequestDefaultMessageStore 接上文SendMessageProcessor对象接收到消息之后会把消息变成存储对象DefaultStoreMessage实例 DefaultMessageStore的默认存储消息的方法asyncPutMessage如下 Override public CompletableFuturePutMessageResult asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus this.checkStoreStatus();if (checkStoreStatus ! PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus this.checkMessage(msg);if (msgCheckStatus PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime this.getSystemClock().now();CompletableFuturePutMessageResult putResultFuture this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) - {long elapsedTime this.getSystemClock().now() - beginTime;if (elapsedTime 500) {log.warn(putMessage not in lock elapsed time(ms){}, bodyLength{}, elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}});return putResultFuture; }整个存储流程的代码还是比较清晰的 先判断消息是否投放成功 及 检查消息的格式上述检查都没问题时就会把消息存储在CommitLog里 CompletableFuturePutMessageResult putResultFuturethis.commitLog.asyncPutMessage(msg);CommitLog Broker接收到消息后最终消息存储在Commitlog对象中调用的是CommitLog的putMessage方法 public CompletableFuturePutMessageResult asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();// 延时消息处理事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 存储消息时延时消息进入SCHEDULE_TOPIC_XXXX的主题下topic TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 消息队列编号 延迟级别 - 1queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal this.putMessageThreadLocal.get();PutMessageResult encodeResult putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult ! null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);PutMessageContext putMessageContext new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock 0;MappedFile unlockMappedFile null;// 加锁同一时刻只能有一个线程put消息putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {MappedFile mappedFile this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 只有不存在映射文件或文件已存满才进行创建if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); }if (null mappedFile) {log.error(create mapped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 追加消息至MappedFile的缓存中更新写入位置wrotePositionresult mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;// 当文件剩余空间不足时创建新的MappedFile并写入case END_OF_FILE: unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create mapped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {// 释放锁putMessageLock.unlock();}/** log **/if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());CompletableFuturePutMessageStatus flushResultFuture submitFlushRequest(result, msg);CompletableFuturePutMessageStatus replicaResultFuture submitReplicaRequest(result, msg);// 异步刷盘流程return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) - {if (flushStatus ! PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus ! PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error(do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {},msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}消息在CommitLog文件中是顺序存储的 RocketMQ消息存储在CommitLog文件里最终落盘对应的类为MappedFile它是从MappedFileQueue中获取的如果对象不存在就会创建 MappedFile mappedFile this.mappedFileQueue.getLastMappedFile();// 只有不存在映射文件或文件已存满才进行创建 if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); }创建获取完成对象之后就会把消息插入到mappedFile里如果文件放不下了则会重新创建一个mappedFile来对其进行写入最后就是使用异步Future的方式把消息持久化到磁盘上。 MappedFileQueue 上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象 public MappedFile getLastMappedFile() {MappedFile mappedFileLast null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error(getLastMappedFile has exception., e);break;}}return mappedFileLast;}当MappedFile对象为空时表示MappedFile对象不存在此时就需要重新创建一个MappedFile对象相应的方法在MappedFileQueue里 public class MappedFileQueue {private static final InternalLogger log InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);// 批量删除文件上限private static final int DELETE_FILES_BATCH_MAX 10;// 目录private final String storePath;// 每个映射文件的大小private final int mappedFileSize;// 映射文件数组private final CopyOnWriteArrayListMappedFile mappedFiles new CopyOnWriteArrayListMappedFile();// 分配MappedFile服务private final AllocateMappedFileService allocateMappedFileService;// 最后flush到的offsetprivate long flushedWhere 0;// 最后commit到的offsetprivate long committedWhere 0;// 最后保存的时间戳private volatile long storeTimestamp 0;/*** 获取最后一个可写入的映射文件* 当最后一个文件已经满的时候创建一个新的文件*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset -1;MappedFile mappedFileLast getLastMappedFile();// 不存在映射文件if (mappedFileLast null) {// 计算从哪个offset开始createOffset startOffset - (startOffset % this.mappedFileSize);}// 最后一个文件已满if (mappedFileLast ! null mappedFileLast.isFull()) {createOffset mappedFileLast.getFileFromOffset() this.mappedFileSize;}// 创建文件if (createOffset ! -1 needCreate) {String nextFilePath this.storePath File.separator UtilAll.offset2FileName(createOffset);String nextNextFilePath this.storePath File.separator UtilAll.offset2FileName(createOffset this.mappedFileSize);MappedFile mappedFile null;if (this.allocateMappedFileService ! null) {mappedFile this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error(create mappedFile exception, e);}}if (mappedFile ! null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}public MappedFile getLastMappedFile(final long startOffset) {return getLastMappedFile(startOffset, true);} }当获取到的“最后一个”MappedFile不存在或文件已满时则新建一个并计算新文件的createOffset MappedFile 在CommitLog的代码中最终把消息追加到MappedFile文件的缓冲区中同时更新其写入位置writePosition但是还没有刷盘 result mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);其中调用了MappedFile的appendMessage方法实现添加消息到消息映射文件 public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt ! null;assert cb ! null;int currentPos this.wrotePosition.get();if (currentPos this.fileSize) {// 缓冲区ByteBuffer byteBuffer writeBuffer ! null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;// 判断是否是批量操作if (messageExt instanceof MessageExtBrokerInner) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新写入位置 写入偏移量this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp result.getStoreTimestamp();return result;}log.error(MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}, currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}通过源码可以看到实际上是吧消息放入ByteBuffer同时更新写入位置和偏移量 submitFlushRequest 提交刷盘请求的源码如下 public CompletableFuturePutMessageStatus submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 异步flushif (FlushDiskType.SYNC_FLUSH this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());service.putRequest(request);return request.future();} else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 同步flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup(); // 异步使用MappedByteBuffer默认策略} else {commitLogService.wakeup(); // 异步使用字节缓冲区 FileChannel}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}CommitLog文件刷盘是由FlushCommitLogService服务具体执行一共有两种刷盘策略 FlushDiskType.SYNC_FLUSH同步刷盘FlushDiskType.ASYNC_FLUSH异步刷盘 同步刷盘的情况下必须要等到数据刷盘成功后才会有返回结果如果是异步刷盘只需要把消息放入内存之后即可返回。 上述策略可以在broker.conf配置文件中进行配置
http://www.w-s-a.com/news/839655/

相关文章:

  • 做国外单的网站叫什么海南省海口市网站建设
  • 杭州响应式网站案例wordpress5.2.2
  • 网站建设运营维护合同wordpress资源搜索插件
  • 国外网站流量查询东莞网站建设教程
  • 餐饮类网站建设达到的作用东莞工程建设交易中心网
  • 网站设计 知识产权湖北网站建设xiduyun
  • 猫咪网站模版下载中国风 古典 红色 网站源代码
  • 个人网站备案模板制作网站首页
  • 潍坊正规建设网站网站建设设计作业
  • 推荐一下网站谢谢辽宁住房城乡建设部官方网站
  • 网站文件大小英选 网站开发
  • 济南建网站哪家好wordpress编辑器排行
  • 在福州做搬家网站多少钱画册设计网站有哪些
  • 如何让别人浏览我做的网站哪些方法可以建设网站
  • 网站建设与管理网络推广的优点
  • 美食网站的设计与制作做网站的电销话术
  • 中国档案网站建设现状研究陕西建设厅执业资格注册中心网站
  • 网站建设的内容管理怎么用ps切片在dw里做网站
  • 建设婚恋网站用什么搭建涿州网站开发
  • 做知识内容的网站与app哈尔滨哪里有做网站的
  • 青岛企业网站建站模板百度网站建设推广
  • 做360网站中保存的图片存在哪里个人建立网站要多少钱
  • 网站安装部署无锡做网站的公司
  • 怎么将网站做成小程序安装wordpress到服务器
  • 企业网站建设的四大因素沈阳网站建设招标公司
  • wordpress仿站开发公司网站策划宣传
  • 金乡县网站开发网站开发三个流程
  • qq空间网站是多少纺织网站建设方案
  • 建设微网站项目报告网站优化难吗
  • 做网站需要自己上传产品吗企业网站系统设计