当前位置: 首页 > news >正文

网站建设 指标自己做网站运营

网站建设 指标,自己做网站运营,中信建设有限责任公司海南分公司,网站设计成品1、Broker概述 Broker 在 RocketMQ 架构中的角色#xff0c;就是存储消息#xff0c;核心任务就是持久化消息#xff0c;生产者发送消息给 Broker,消费者从 Broker 消费消息#xff0c;其物理部署架构图如下#xff1a; 备注#xff1a;以上摘录自官方 RocketMQ 设计文档…1、Broker概述 Broker 在 RocketMQ 架构中的角色就是存储消息核心任务就是持久化消息生产者发送消息给 Broker,消费者从 Broker 消费消息其物理部署架构图如下 备注以上摘录自官方 RocketMQ 设计文档。 上述基本描述了消息中间件的架构设计不仅限于 RocketMQ,不同消息中间件的最大区别之一在消息的存储上。 2、Broker存储设计概要 接下来从配置文件的角度来窥探 Broker 存储设计的关注点对应代码MessageStoreConfig。 storePathRootDir 设置Broker的存储根目录默认为 $Broker_Home/store。storePathCommitLog 设置commitlog的存储目录默认为$Broker_Home/store/commitlog。mapedFileSizeCommitLog commitlog 文件的大小默认为1G。mapedFileSizeConsumeQueue consumeQueueSizeConsumeQueue 存放的是定长的信息20个字节偏移量、size、tagscode,默认30w * ConsumeQueue.CQ_STORE_UNIT_SIZE。enableConsumeQueueExt 是否开启 consumeQueueExt,默认为 false,就是如果消费端消息消费速度跟不上是否创建一个扩展的 ConsumeQueue文件如果不开启应该会阻塞从 commitlog 文件中获取消息并且 ConsumeQueue,应该是按topic独立的。mappedFileSizeConsumeQueueExt 扩展consume文件的大小默认为48M。flushIntervalCommitLog 刷写 CommitLog 的间隔时间RocketMQ 后台会启动一个线程将消息刷写到磁盘这个也就是该线程每次运行后等待的时间默认为500毫秒。flush 操作调用文件通道的force()方法。commitIntervalCommitLog 提交消息到 CommitLog 对应的文件通道的间隔时间原理与上面类似将消息写入到文件通道调用FileChannel.write方法得到最新的写指针默认为200毫秒。useReentrantLockWhenPutMessage 在put message( 将消息按格式封装成msg放入相关队列时实用的锁机制自旋或ReentrantLock)。flushIntervalConsumeQueue 刷写到ConsumeQueue的间隔默认为1s。flushCommitLogLeastPages 每次 flush commitlog 时最小发生变化的页数。commitCommitLogLeastPages 每一次 commitlog 提交任务至少需要的页数。flushLeastPagesWhenWarmMapedFile 用字节0填充整个文件每多少页刷盘一次默认4096异步刷盘模式生效。flushConsumeQueueLeastPages 一次刷盘至少需要的脏页数量默认为2针对 consuequeue 文件。putMsgIndexHightWater 当前版本未使用。 接下来从如下方面去深入其实现 1生产者发送消息 2消息协议格式 3消息存储、检索 4消费队列维护 5消息消费、重试等机制 2.1 消息发送 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl sendDefaultImpl方法源码分析 rprivate SendResult sendDefaultImpl(//Message msg, // final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {2.1.1 消息发送参数详解 1、Message msg 2、communicationMode communicationMode 发送方式SYNC(同步)、ASYNC异步、ONEWAY(单向不关注返回) 3、SendCallback sendCallback 异步消息发送回调函数。 4、long timeout 消息发送超时时间。 2.2.2 消息发送流程 默认消息发送实现 private SendResult sendDefaultImpl(//Message msg, //final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic()); // 1if (topicPublishInfo ! null topicPublishInfo.ok()) {MessageQueue mq null;Exception exception null;SendResult sendResult null;int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {String lastBrokerName null mq ? null : mq.getBrokerName();MessageQueue tmpmq this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 2if (tmpmq ! null) {mq tmpmq;brokersSent[times] mq.getBrokerName();try {beginTimestampPrev System.currentTimeMillis();sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); // 3endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // 4log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQClientException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQBrokerException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult ! null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format(sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn(sendKernelImpl exception, e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult ! null) {return sendResult;}String info String.format(Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s,times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException new MQClientException(info, exception);if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}ListString nsList this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null nsList || nsList.isEmpty()) {throw new MQClientException(No name server address, please set it. FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException(No route info of this topic, msg.getTopic() FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); 主要的核心步骤如下 代码1获取topic的路由信息。 代码2根据topic负载均衡算法选择一个MessageQueue。 代码3向 MessageQueue 发送消息。 代码4更新失败策略,主要用于规避发生故障的 broker下文会详细介绍。 代码5如果是同步调用方式(SYNC),则执行失败重试策略默认重试两次。 2、2.2.1 获取topic的路由信息 首先我们来思考一下topic 的路由信息包含哪些内容。 消息的发布与订阅基于topic,路由发布信息以 topic 维度进行描述。 Broker 负载消息存储一个 topic 可以分布在多台 Broker 上(负载均衡)每个 Broker 包含多个 Queue。队列元数据基于Broker来描述QueueData所在 BrokerName、读队列个数、写队列个数、权限、同步或异步。 接下来从源码分析 tryToFindTopicPublishInfo方法详细了解获取 Topic 的路由信息。 DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo this.topicPublishInfoTable.get(topic); // 1if (null topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 2topicPublishInfo this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //3return topicPublishInfo;} else {this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //4topicPublishInfo this.topicPublishInfoTable.get(topic);return topicPublishInfo;} 代码1从本地缓存(ConcurrentMap String/* topic */, TopicPublishInfo)中尝试获取第一次肯定为空走代码2的流程。 代码2尝试从 NameServer 获取配置信息并更新本地缓存配置。 代码3如果找到可用的路由信息并返回。 代码4如果未找到路由信息则再次尝试使用默认的 topic 去找路由配置信息。 接下来我们重点关注updateTopicRouteInfoFromNameServer方法。 MQClientInstance#updateTopicRouteInfoFromNameServer public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { // 1try {TopicRouteData topicRouteData;if (isDefault defaultMQProducer ! null) { //2topicRouteData this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData ! null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {topicRouteData this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); //3}if (topicRouteData ! null) {TopicRouteData old this.topicRouteTable.get(topic); //4boolean changed topicRouteDataIsChange(old, topicRouteData); //5if (!changed) {changed this.isNeedUpdateTopicRouteInfo(topic); //6} else {log.info(the topic[{}] route info changed, old[{}] ,new[{}], topic, old, topicRouteData);}if (changed) { //7TopicRouteData cloneTopicRouteData topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info //8{TopicPublishInfo publishInfo topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);IteratorEntryString, MQProducerInner it this.producerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQProducerInner entry it.next();MQProducerInner impl entry.getValue();if (impl ! null) {impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info //9{SetMessageQueue subscribeInfo topicRouteData2TopicSubscribeInfo(topic, topicRouteData);IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQConsumerInner entry it.next();MQConsumerInner impl entry.getValue();if (impl ! null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info(topicRouteTable.put. Topic {}, TopicRouteData[{}], topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn(updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}, topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) !topic.equals(MixAll.DEFAULT_TOPIC)) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}} finally {this.lockNamesrv.unlock();}} else {log.warn(updateTopicRouteInfoFromNameServer tryLock timeout {}ms, LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}return false; 代码1为了避免重复从 NameServer 获取配置信息在这里使用了ReentrantLock,并且设有超时时间。固定为3000s。 代码23的区别一个是获取默认 topic 的配置信息一个是获取指定 topic 的配置信息该方法在这里就不跟踪进去了具体的实现就是通过与 NameServer 的长连接 Channel 发送 GET_ROUTEINTO_BY_TOPIC (105)命令获取配置信息。注意次过程的超时时间为3s由此可见NameServer的实现要求高效。 代码4、5、6从这里开始拿到最新的 topic 路由信息后需要与本地缓存中的 topic 发布信息进行比较如果有变化则需要同步更新发送者、消费者关于该 topic 的缓存。 代码7更新发送者的缓存。 代码8更新订阅者的缓存消费队列信息。 至此 tryToFindTopicPublishInfo 运行完毕从 NameServe r获取 TopicPublishData继续消息发送的第二个步骤选取一个消息队列。
http://www.w-s-a.com/news/28542/

相关文章:

  • 做网站激励语家居装饰网站设计论文
  • 镜像的网站怎么做排名无极网站建设质量
  • 奉贤集团公司网站建设小工具文本wordpress
  • 不用代码做网站网站建设和运行费用
  • 阜阳网站开发招聘网站建设合作协议申请
  • 电子配件 技术支持 东莞网站建设wordpress 生成html代码
  • 网站用免费空间好不好网站建设的视频
  • 网站开发项目职责门户资源分享网站模板
  • 建网站需要什么语言如何做二维码跳转到网站
  • 天津建设交培训中心网站做网站起名字
  • 黑河北京网站建设湛江市住房和城乡建设局网站
  • 网站建设拾金手指下拉十九企业查询官网
  • 邢台提供网站建设公司哪家好五合一建站
  • 京东网站设计代码驻马店做网站的公司
  • 织梦网站模板使用教程福州网站建设工作
  • 做网站要准备的需求asp 网站后台
  • 滨州网站开发公司中立建设集团有限公司网站
  • 泰安建设厅网站做网站为什么要建站点
  • 有什么好的手机推荐网站创建网站需要哪些工作
  • 网站能给企业带来什么上饶市网站建设公司
  • 学做网站卖东西去哪学南宁网站建设gxjzdrj
  • 欧美网站建设案例网站开发 男生
  • 网站正在开发中做电子元器件的网站
  • 做网站搭建的公司中国建设银行官网站u盾证书
  • 大连哪里有手机自适应网站建设公司网站介绍模板 html
  • 佛山模板建站宣传片制作公司电话
  • 文字网站居中能自己做网站接业务吗
  • 免备案自助建站网站广州珈瑶公司是哪一年注册的
  • ps做网站界面wordpress为图片添加圆角
  • seo优化推广业务员招聘seo顾问服务福建