网站建设 指标,自己做网站运营,中信建设有限责任公司海南分公司,网站设计成品1、Broker概述
Broker 在 RocketMQ 架构中的角色#xff0c;就是存储消息#xff0c;核心任务就是持久化消息#xff0c;生产者发送消息给 Broker,消费者从 Broker 消费消息#xff0c;其物理部署架构图如下#xff1a; 备注#xff1a;以上摘录自官方 RocketMQ 设计文档…1、Broker概述
Broker 在 RocketMQ 架构中的角色就是存储消息核心任务就是持久化消息生产者发送消息给 Broker,消费者从 Broker 消费消息其物理部署架构图如下 备注以上摘录自官方 RocketMQ 设计文档。
上述基本描述了消息中间件的架构设计不仅限于 RocketMQ,不同消息中间件的最大区别之一在消息的存储上。
2、Broker存储设计概要 接下来从配置文件的角度来窥探 Broker 存储设计的关注点对应代码MessageStoreConfig。
storePathRootDir 设置Broker的存储根目录默认为 $Broker_Home/store。storePathCommitLog 设置commitlog的存储目录默认为$Broker_Home/store/commitlog。mapedFileSizeCommitLog commitlog 文件的大小默认为1G。mapedFileSizeConsumeQueue consumeQueueSizeConsumeQueue 存放的是定长的信息20个字节偏移量、size、tagscode,默认30w * ConsumeQueue.CQ_STORE_UNIT_SIZE。enableConsumeQueueExt 是否开启 consumeQueueExt,默认为 false,就是如果消费端消息消费速度跟不上是否创建一个扩展的 ConsumeQueue文件如果不开启应该会阻塞从 commitlog 文件中获取消息并且 ConsumeQueue,应该是按topic独立的。mappedFileSizeConsumeQueueExt 扩展consume文件的大小默认为48M。flushIntervalCommitLog 刷写 CommitLog 的间隔时间RocketMQ 后台会启动一个线程将消息刷写到磁盘这个也就是该线程每次运行后等待的时间默认为500毫秒。flush 操作调用文件通道的force()方法。commitIntervalCommitLog 提交消息到 CommitLog 对应的文件通道的间隔时间原理与上面类似将消息写入到文件通道调用FileChannel.write方法得到最新的写指针默认为200毫秒。useReentrantLockWhenPutMessage 在put message( 将消息按格式封装成msg放入相关队列时实用的锁机制自旋或ReentrantLock)。flushIntervalConsumeQueue 刷写到ConsumeQueue的间隔默认为1s。flushCommitLogLeastPages 每次 flush commitlog 时最小发生变化的页数。commitCommitLogLeastPages 每一次 commitlog 提交任务至少需要的页数。flushLeastPagesWhenWarmMapedFile 用字节0填充整个文件每多少页刷盘一次默认4096异步刷盘模式生效。flushConsumeQueueLeastPages 一次刷盘至少需要的脏页数量默认为2针对 consuequeue 文件。putMsgIndexHightWater 当前版本未使用。
接下来从如下方面去深入其实现
1生产者发送消息
2消息协议格式
3消息存储、检索
4消费队列维护
5消息消费、重试等机制
2.1 消息发送
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl sendDefaultImpl方法源码分析
rprivate SendResult sendDefaultImpl(//Message msg, // final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {2.1.1 消息发送参数详解
1、Message msg 2、communicationMode communicationMode
发送方式SYNC(同步)、ASYNC异步、ONEWAY(单向不关注返回)
3、SendCallback sendCallback
异步消息发送回调函数。
4、long timeout
消息发送超时时间。
2.2.2 消息发送流程
默认消息发送实现
private SendResult sendDefaultImpl(//Message msg, //final CommunicationMode communicationMode, //final SendCallback sendCallback, //final long timeout//) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic()); // 1if (topicPublishInfo ! null topicPublishInfo.ok()) {MessageQueue mq null;Exception exception null;SendResult sendResult null;int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {String lastBrokerName null mq ? null : mq.getBrokerName();MessageQueue tmpmq this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 2if (tmpmq ! null) {mq tmpmq;brokersSent[times] mq.getBrokerName();try {beginTimestampPrev System.currentTimeMillis();sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); // 3endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // 4log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQClientException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;} catch (MQBrokerException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult ! null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format(sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn(sendKernelImpl exception, e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult ! null) {return sendResult;}String info String.format(Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s,times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException new MQClientException(info, exception);if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}ListString nsList this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null nsList || nsList.isEmpty()) {throw new MQClientException(No name server address, please set it. FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException(No route info of this topic, msg.getTopic() FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
主要的核心步骤如下
代码1获取topic的路由信息。
代码2根据topic负载均衡算法选择一个MessageQueue。
代码3向 MessageQueue 发送消息。
代码4更新失败策略,主要用于规避发生故障的 broker下文会详细介绍。
代码5如果是同步调用方式(SYNC),则执行失败重试策略默认重试两次。
2、2.2.1 获取topic的路由信息
首先我们来思考一下topic 的路由信息包含哪些内容。
消息的发布与订阅基于topic,路由发布信息以 topic 维度进行描述。
Broker 负载消息存储一个 topic 可以分布在多台 Broker 上(负载均衡)每个 Broker 包含多个 Queue。队列元数据基于Broker来描述QueueData所在 BrokerName、读队列个数、写队列个数、权限、同步或异步。
接下来从源码分析 tryToFindTopicPublishInfo方法详细了解获取 Topic 的路由信息。
DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo this.topicPublishInfoTable.get(topic); // 1if (null topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 2topicPublishInfo this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //3return topicPublishInfo;} else {this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); //4topicPublishInfo this.topicPublishInfoTable.get(topic);return topicPublishInfo;}
代码1从本地缓存(ConcurrentMap String/* topic */, TopicPublishInfo)中尝试获取第一次肯定为空走代码2的流程。
代码2尝试从 NameServer 获取配置信息并更新本地缓存配置。
代码3如果找到可用的路由信息并返回。
代码4如果未找到路由信息则再次尝试使用默认的 topic 去找路由配置信息。
接下来我们重点关注updateTopicRouteInfoFromNameServer方法。
MQClientInstance#updateTopicRouteInfoFromNameServer
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { // 1try {TopicRouteData topicRouteData;if (isDefault defaultMQProducer ! null) { //2topicRouteData this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData ! null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {topicRouteData this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); //3}if (topicRouteData ! null) {TopicRouteData old this.topicRouteTable.get(topic); //4boolean changed topicRouteDataIsChange(old, topicRouteData); //5if (!changed) {changed this.isNeedUpdateTopicRouteInfo(topic); //6} else {log.info(the topic[{}] route info changed, old[{}] ,new[{}], topic, old, topicRouteData);}if (changed) { //7TopicRouteData cloneTopicRouteData topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info //8{TopicPublishInfo publishInfo topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);IteratorEntryString, MQProducerInner it this.producerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQProducerInner entry it.next();MQProducerInner impl entry.getValue();if (impl ! null) {impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info //9{SetMessageQueue subscribeInfo topicRouteData2TopicSubscribeInfo(topic, topicRouteData);IteratorEntryString, MQConsumerInner it this.consumerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQConsumerInner entry it.next();MQConsumerInner impl entry.getValue();if (impl ! null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info(topicRouteTable.put. Topic {}, TopicRouteData[{}], topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn(updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}, topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) !topic.equals(MixAll.DEFAULT_TOPIC)) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}} finally {this.lockNamesrv.unlock();}} else {log.warn(updateTopicRouteInfoFromNameServer tryLock timeout {}ms, LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}return false;
代码1为了避免重复从 NameServer 获取配置信息在这里使用了ReentrantLock,并且设有超时时间。固定为3000s。
代码23的区别一个是获取默认 topic 的配置信息一个是获取指定 topic 的配置信息该方法在这里就不跟踪进去了具体的实现就是通过与 NameServer 的长连接 Channel 发送 GET_ROUTEINTO_BY_TOPIC (105)命令获取配置信息。注意次过程的超时时间为3s由此可见NameServer的实现要求高效。
代码4、5、6从这里开始拿到最新的 topic 路由信息后需要与本地缓存中的 topic 发布信息进行比较如果有变化则需要同步更新发送者、消费者关于该 topic 的缓存。
代码7更新发送者的缓存。
代码8更新订阅者的缓存消费队列信息。
至此 tryToFindTopicPublishInfo 运行完毕从 NameServe r获取 TopicPublishData继续消息发送的第二个步骤选取一个消息队列。