asp.net做网站,小程序开发外包注意事项,湘潭公司网站建设,网站制作说明文章目录 1. 前言2. 消费者重平衡服务 RebalanceService3. doRebalance4. rebalanceByTopic 对 topic 下面的消息队列重平衡5. updateProcessQueueTableInRebalance 更新本地缓存5.1 removeUnnecessaryMessageQueue 删除不需要的消息队列5.2 removeDirtyOffset 删除偏移量5.3 d… 文章目录 1. 前言2. 消费者重平衡服务 RebalanceService3. doRebalance4. rebalanceByTopic 对 topic 下面的消息队列重平衡5. updateProcessQueueTableInRebalance 更新本地缓存5.1 removeUnnecessaryMessageQueue 删除不需要的消息队列5.2 removeDirtyOffset 删除偏移量5.3 dispatchPullRequest 提交消息拉取请求 6. messageQueueChanged 更新消息队列信息7. 小结 本文章基于 RocketMQ 4.9.3 1. 前言
【RocketMQ】- 源码系列目录【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息【RocketMQ 生产者和消费者】- 消费者启动源码
上一篇文章中我们探讨了消费者的启动这里就来看下消费者的重平衡逻辑所谓重平衡就是消费者对订阅的 topic 下面的队列重分配topic 下面有不同的 MessageQueue这些队列有可能会存储到不同的 broker 上消费者消费的时候会消费分配到的 MessageQueue 下面的消息分配的过程就是重平衡。当然了为了使得消息队列能否更均衡又或者按照一些特定条件比如机房的区域进行划分RocketMQ 也提供了多种策略下面就先来看下 RocketMQ 的重平衡服务。 2. 消费者重平衡服务 RebalanceService
在文章 【RocketMQ Broker 相关源码】- NettyRemotingClient 和 NettyRemotingServer 中我们说过当消费者向 broker 注册或者是取消注册的的时候会触发 DefaultConsumerIdsChangeListener 的 CHANGE 事件在里面会去通知消费者重平衡。
同时在消费者启动的时候会通过 this.mQClientFactory.rebalanceImmediately() 唤醒重平衡服务 RebalanceService这个服务也是一个线程每次执行都会阻塞 20s如果出现像上面的消费者变更的情况就会提前唤醒来执行。
那上面是消费者初始化最后唤醒重平衡服务那唤醒之前总得启动吧重平衡又是在哪启动的呢其实就是在 MQClientInstance 里面启动消费者启动源码有一行 mQClientFactory.start()就是启动 MQClientInstance 的这个方法我们在前面生产者启动也讲过了所以就不多说直接到里面看里面做了什么。 可以看到 MQClientInstance 的 start 方法开启了负载均衡服务那开启了为什么又要唤醒呢其实跟这个类的 run 方法有关了下面来看下这个类 RebalanceService 里面的整体代码代码不多。
/*** 消费者重平衡服务*/
public class RebalanceService extends ServiceThread {private static long waitInterval Long.parseLong(System.getProperty(rocketmq.client.rebalance.waitInterval, 20000));private final InternalLogger log ClientLogger.getLog();private final MQClientInstance mqClientFactory;public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory mqClientFactory;}Overridepublic void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {// 阻塞等待 20sthis.waitForRunning(waitInterval);// 重平衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() service end);}Overridepublic String getServiceName() {return RebalanceService.class.getSimpleName();}
}可以看到 run 方法执行的时候是会先阻塞 20s再开始重平衡的因为消费者启动重平衡服务之后还没有把自己注册到 broker也就是没有向 broker 发送心跳也没有更新当前消费者订阅的 topic 路由信息这种情况下如果开始重平衡也获取不到 topic 下面的队列所以先阻塞 20s20s 的时间足够完成这些事了。最后再通过 this.mQClientFactory.rebalanceImmediately() 唤醒重平衡服务主动进行重平衡。
这里就是重平衡的入口了但是还有一个方法也会调用重平衡就是 resetOffset不过这个方法我也还没看是用来干什么的现在暂时没涉及到这块的源码有兴趣的朋友可以去看下。现在来看下 doRebalance 方法的逻辑。 3. doRebalance
/*** 消费者执行重平衡的逻辑, 所谓重平衡就是对 topic 下面的 MessageQueue 进行负载均衡*/
public void doRebalance() {// 遍历所有消费者for (Map.EntryString, MQConsumerInner entry : this.consumerTable.entrySet()) {MQConsumerInner impl entry.getValue();if (impl ! null) {try {// 执行重平衡impl.doRebalance();} catch (Throwable e) {log.error(doRebalance exception, e);}}}
}Override
public void doRebalance() {// 如果消费者服务没有暂停if (!this.pause) {// 执行重平衡操作this.rebalanceImpl.doRebalance(this.isConsumeOrderly());}
}可以看到重平衡的逻辑就是遍历这个 MQClientInstance 下面的所有消费者对每一个消费者执行重平衡当前版本就是一个消费者。最终执行重平衡的时候会传入 isOrder 参数意思是是否顺序消费这里我们就当正常非顺序消费。下面来看下 doRebalance 这个方法。
/*** 执行重平衡* param isOrder 是否是顺序消费*/
public void doRebalance(final boolean isOrder) {// 获取当前消费者的订阅信息集合MapString, SubscriptionData subTable this.getSubscriptionInner();if (subTable ! null) {// 遍历所有订阅信息for (final Map.EntryString, SubscriptionData entry : subTable.entrySet()) {// topic 信息final String topic entry.getKey();try {// 对 topic 进行重平衡this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn(rebalanceByTopic Exception, e);}}}}// 重平衡之后这个 Consumer 会分配到一些 MessageQueue, 这时候就需要删掉不属于当前 Consumer 分配到的 MessageQueuethis.truncateMessageQueueNotMyTopic();
}可以看到这个方法就是获取当前消费者的订阅信息集合也就是当前消费者订阅的哪个 topic订阅这个 topic 下面的哪个 tag还有订阅模式是什么TAG 过滤还是 SQL92 过滤获取到订阅信息之后遍历所有订阅消息然后获取订阅的 topic接着调用 rebalanceByTopic 来对这个 topic 下面的消费队列进行重平衡就是看看这个消费者能分配到这个 topic 下面的哪个消息队列下面就来重点看下这个 rebalanceByTopic 方法。 4. rebalanceByTopic 对 topic 下面的消息队列重平衡
下面来看下这个方法里面的逻辑先来看下这个方法的参数。
/*** 根据 topic 进行重平衡* param topic* param isOrder*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {// 广播模式case BROADCASTING: {...}// 集群模式, 会基于负载均衡策略确定分配给当前消费者的 MessageQueuecase CLUSTERING: {...}default: break;}
}可以看到重平衡会分为两种模式分别去处理广播模式和集群模式。首先我们知道广播模式就是一条消息会被这个消费者组下面的消费者去消费所以广播模式下不需要进行负载均衡因为每一个消费者都会消费同一条消息一遍。
然后是集群模式集群模式下每一个消费者都会分配到单独的消息队列且一个消息队列只会分配给一个消费者分配的方式就是负载均衡策略这个后面再看下面先来看下广播模式下的处理。
// 广播模式
case BROADCASTING: {// 广播模式下不需要进行负载均衡, 消费者会消费所有队列, 这里就是获取这个 topic 下面的所有队列SetMessageQueue mqSet this.topicSubscribeInfoTable.get(topic);if (mqSet ! null) {// 更新 processQueueTable 集合, ProcessQueue 和 MessageQueue 是 1v1 的关系, 可以把 ProcessQueue 看成是 MessageQueue 的处理类boolean changed this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {// 如果消费者负责的消费队列发生了变更, 比如又分配多了几个队列或者少几个队列, 又或者消费的队列变了// 设置本地的订阅缓存的版本, 设置流控参数, 最后上报新的订阅关系到 broker 中this.messageQueueChanged(topic, mqSet, mqSet);log.info(messageQueueChanged {} {} {} {},consumerGroup,topic,mqSet,mqSet);}} else {log.warn(doRebalance, {}, but the topic[{}] not exist., consumerGroup, topic);}break;
}可以看到广播模式下由于 topic 下面的消息会发送给消费者组下面的所有消费者去消费所以这里就是通过 topicSubscribeInfoTable 集合直接获取这个 topic 下面的所有消费者队列接着通过 updateProcessQueueTableInRebalance 方法更新 processQueueTable 集合ProcessQueue 和 MessageQueue 是 1v1 的关系可以把 ProcessQueue 看成是 MessageQueue 的处理类。
当然了如果经过负载均衡之后消费者负责的消费队列发生了变更比如又分配多了几个队列或者少几个队列又或者消费的队列变了那么设置本地的订阅缓存的版本设置流控参数最后上报新的订阅关系到 broker 中。因为流控会对 topic 下面的消息进行流控所以当消费者消息队列变更就需要重新根据流控参数来计算新的流控值同时又由于消费者向 broker 上报信息的时候会通过版本号来判断订阅信息是否发生变化了的所以这里会更新下当前消费者存储的订阅信息版本因为消费者上报的心跳信息其实也是订阅信息。
好了上面就是广播模式下面的负载均衡其实就是将这个 topic 的所有队列消息都分配给这个消费者。相对比广播模式集群模式要更复杂些涉及到如何分配队列给消费者下面来看下集群模式下的负载均衡也就是 case CLUSTERING 这一块的代码。
首先还是一样先获取 topic 下面的所有消费队列然后向 broker 获取这个消费者组下面的所有消费者 clientId 集合。
// 获取 topic 下面的所有消息队列
SetMessageQueue mqSet this.topicSubscribeInfoTable.get(topic);
// 获取当前消费者组的所有消费者的 clientId 集合
ListString cidAll this.mQClientFactory.findConsumerIdList(topic, consumerGroup);接下来判断下是否获取到消费者或者消息队列了如果没有获取到就打印下日志。
if (null mqSet) {// 如果找不到消息队列, 并且这个 topic 也不是消费者组的重传 topicif (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {// 打印下日志, 这里就是找不到 topic 的消息队列了, 或者说这个 topic 不存在log.warn(doRebalance, {}, but the topic[{}] not exist., consumerGroup, topic);}
}// 这里也是, 如果没找到消费者, 也打印下日志
if (null cidAll) {log.warn(doRebalance, {} {}, get consumer id list failed, consumerGroup, topic);
}如果能获取到就用负载均衡策略来分配消息队列给当前消费者分配消息队列在分配之前先把消费者集合的 clientId 加入到 List 集合中排序然后再去分配分配策略下一篇文章再说这篇文章就看下整体方法。
// 首先创建 MessageQueue 集合, 将消息队列添加到 mqAll 中
ListMessageQueue mqAll new ArrayListMessageQueue();
mqAll.addAll(mqSet);// 排序, MessageQueue 排序逻辑是先根据 topic brokerName queueId 这个顺序排序的
Collections.sort(mqAll);
// 这里就是对消费者按 clientId 排序
Collections.sort(cidAll);// 分配策略
AllocateMessageQueueStrategy strategy this.allocateMessageQueueStrategy;ListMessageQueue allocateResult null;
try {// 根据分配策略来分配消息队列, allocateResult 里面返回的就是当前消费者分配到的消息队列allocateResult strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);
} catch (Throwable e) {log.error(AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName{}, strategy.getName(),e);return;
}// 消息队列去重
SetMessageQueue allocateResultSet new HashSetMessageQueue();
if (allocateResult ! null) {allocateResultSet.addAll(allocateResult);
}负载均衡之后更新本地 processQueueTable 集合更新的方法是 updateProcessQueueTableInRebalance跟上面广播模式用的是一个方法而集群模式下传入的就是新分配的队列不是全部队列。然后如果消费者分配的消息队列发生了变化就更新本地的订阅缓存的版本, 设置流控参数, 最后上报新的订阅关系到 broker 中。
// 负载均衡之后更新本地 processQueueTable 集合
boolean changed this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {// 这里是消费者的消费队列发生了变化log.info(rebalanced result changed. allocateMessageQueueStrategyName{}, group{}, topic{}, clientId{}, mqAllSize{}, cidAllSize{}, rebalanceResultSize{}, rebalanceResultSet{},strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);// 设置本地的订阅缓存的版本, 设置流控参数, 最后上报新的订阅关系到 broker 中this.messageQueueChanged(topic, mqSet, allocateResultSet);
}所以看到这里大家或许也有感觉集群模式下的负载均衡代码跟广播模式下的是差不多的只是集群模式下多了一个步骤就是分配消息队列。 5. updateProcessQueueTableInRebalance 更新本地缓存
上面我们说过当负载均衡获取到消息队列之后会调用这个 updateProcessQueueTableInRebalance 方法更新本地的 processQueueTable这个集合是消息队列 - 处理队列映射关系。ProcessQueue 处理队列是用来处理消息队列的MessageQueue 只存储了队列所属的 topicbrokerName队列 IDProcessQueue 是消息队列处理队列顾名思义就是专门去处理消费者队列操作的比如消息拉取什么的。
/**
* MessageQueue 和 ProcessQueue 是 1v1 的关系, 每一个消费者都有一个 RebalanceImpl, processQueueTable 存在于 RebalanceImpl 中,
* 因为 topic 可以分布在不同的 broker 中, 所以 topic 下面的队列也有可能分布在不同 broker 中, 每一个消费者都会负责 topic 下面的一部分
* 队列, MessageQueue 中就存储了 topic, broker, queueId, 表示这个队列分布在哪个 broker, ProcessQueue 里面实现了消息拉取、存储等
* 逻辑, 专门针对某个 MessageQueue
*/
protected final ConcurrentMapMessageQueue, ProcessQueue processQueueTable new ConcurrentHashMapMessageQueue, ProcessQueue(64);下面就来看下这个方法首先更新处理队列分为两个方面的逻辑第一个就是处理现存的处理队列第二个就是处理新增的处理队列。那首先我们来看第一点消费者负载均衡之后会分配到新的处理队列比如消费者原本是 [Queue1Queue2]新分配的是 [Queue2Queue3]这种情况下就需要从 processQueueTable 中删掉这个 Queue1 的映射同时判断下 Queue2 现在还有没有在用下面代码就是干这件事。
// 是否发生了更改
boolean changed false;IteratorEntryMessageQueue, ProcessQueue it this.processQueueTable.entrySet().iterator();
// 遍历消费者的 processQueueTable 集合, 这个集合存储了当前消费者之前需要消费的队列
while (it.hasNext()) {EntryMessageQueue, ProcessQueue next it.next();MessageQueue mq next.getKey();ProcessQueue pq next.getValue();// 如果这个队列是对应 topic 下的if (mq.getTopic().equals(topic)) {// 如果新分配的消息队列集合不包含当前遍历到的消息队列说明这个队列被移除了if (!mqSet.contains(mq)) {// 设置状态 dropped 为 true, 后面就不会消费这个队列里面的消息了pq.setDropped(true);// 删除不需要的 MessageQueue 的一些相关信息, 如偏移量...if (this.removeUnnecessaryMessageQueue(mq, pq)) {// 如果删除成功了, 将这个 mq 从集合中删掉it.remove();// 发生了更改changed true;log.info(doRebalance, {}, remove unnecessary mq, {}, consumerGroup, mq);}// 如果消费者最后一次拉取消息的时间间隔距离现在超过 120s 了, 这种情况下说明消息队列已经没用了} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:// 如果是 PULL 模式就不用管了, 因为 PULL 模式什么时候拉取消息是用户决定的break;case CONSUME_PASSIVELY:// 如果是 PUSH 模式就说明当前消费队列有问题了, 设置 dropped 状态为 truepq.setDropped(true);// 删除不需要的 MessageQueue 的一些相关信息, 如偏移量...if (this.removeUnnecessaryMessageQueue(mq, pq)) {// 如果删除成功了, 将这个 mq 从集合中删掉it.remove();// 发生了更改changed true;log.error([BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it,consumerGroup, mq);}break;default:break;}}}
}可以看到上面就是遍历当前消费者的 processQueueTable 集合然后判断如果队列是传入的 topic 下面的消息队列但是这个队列在最新分配的 mqSet 中不存在了说明这个队列已经负载均衡给其他消费者这种情况下设置队列状态 dropped 为 true后面就不会消费这个队列里面的消息了。
接下来调用 removeUnnecessaryMessageQueue 删掉一些队列的相关信息因为消息队列已经不需要消费了所以这种情况下本地存储的这个队列的相关信息如消费者偏移量也要删掉这个方法就是做这个事的。如果删除成功就把队列从 processQueueTable 集合中删掉并且设置 changed true。如果消费者最后一次拉取消息的时间间隔距离现在超过 120s 了这种情况下说明消息队列已经没用了正常情况下消息队列第一次分配给消费者就会提交一个消息拉取请求消息拉取服务会向 broker 拉取对应的消息更新消息拉取时间然后接着添加请求这种情况下超过 120s 没有拉取消息说明这个队列很有可能失效了比如配置错误导致没有被分配给任何一个消费者导致一直没有拉取消息。
当然上面的情况也要分为两种情况判断如果是 PULL 模式就不用管了因为 PULL 模式什么时候拉取消息是用户决定的用户自己决定怎么处理。如果是 PUSH 模式就设置 dropped 状态为 true删除不需要的 MessageQueue 的一些相关信息如偏移量… 最后如果删除成功了, 将这个 mq 从集合中删掉。
回到 updateProcessQueueTableInRebalance 方法继续往下看处理新增的队列。
ListPullRequest pullRequestList new ArrayListPullRequest();
// 遍历新分配的消息队列集合
for (MessageQueue mq : mqSet) {// 如果 processQueueTable 处理队列集合中不包括这个新的消息队列if (!this.processQueueTable.containsKey(mq)) {// 如果是顺序 order, 就需要加锁了, 这种情况下需要请求 broker 锁定这个消费队列, 这里相当于获取 broker 的分布式锁// 如果加锁失败了, 说明这个消息队列有可能还在被其他消费者消费, 那么本次重平衡不处理这个队列, 跳过这个队列if (isOrder !this.lock(mq)) {log.warn(doRebalance, {}, add a new mq failed, {}, because lock failed, consumerGroup, mq);continue;}// 从 offsetTable 中删除这个消息队列的消费点位信息, 既然是新分配的, 同时又不在 processQueueTable 中, 说明这个队列// 里面的消息需要重新拉取来处理了this.removeDirtyOffset(mq);// 创建新的 ProcessQueueProcessQueue pq new ProcessQueue();long nextOffset -1L;try {// 该 MessageQueue 下一次从哪里开始消费, pull 模式是用户自己决定, 返回 0, push 则是根据 consumeFromWhere 计算nextOffset this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info(doRebalance, {}, compute offset failed, {}, consumerGroup, mq);continue;}// 这里就是获取到了消费点位if (nextOffset 0) {// 将当前 MessageQueue 和 ProcessQueue 存储到 processQueueTable 集合中, 注意这里是不存在才存入ProcessQueue pre this.processQueueTable.putIfAbsent(mq, pq);if (pre ! null) {// 如果已经存在对应的关系了, 这里应该是考虑了多线程把不然都已经判断 key 不存在才会进这个方法的log.info(doRebalance, {}, mq already exists, {}, consumerGroup, mq);} else {// 这里就是添加成功, 新建一个 PullRequestlog.info(doRebalance, {}, add a new mq, {}, consumerGroup, mq);PullRequest pullRequest new PullRequest();// 设置消费者组pullRequest.setConsumerGroup(consumerGroup);// 设置下一次拉取的消息点位pullRequest.setNextOffset(nextOffset);// 设置要拉取的消息队列pullRequest.setMessageQueue(mq);// 设置处理队列pullRequest.setProcessQueue(pq);// 添加到 pullRequestList 集合中pullRequestList.add(pullRequest);changed true;}} else {log.warn(doRebalance, {}, add new mq failed, {}, consumerGroup, mq);}}
}上面就是遍历新分配的消息队列集合然后判断如果 processQueueTable 处理队列集合中不包括这个新的消息队列说明这个队列是负载均衡新增的下面会判断
如果是 顺序 order, 就需要加锁了这种情况下需要请求 broker 锁定这个消费队列这里相当于获取 broker 的分布式锁如果加锁失败了说明这个消息队列有可能还在被其他消费者消费那么本次重平衡不处理这个队列跳过这个队列
顺序 order 需要确保消息队列的顺序性如果在负载均衡的时候这个队列加锁失败说明其他消费者在使用这个队列这种情况下直接不处理了等待下一次负载均衡再处理这个队列因为需要确保顺序性。
然后从 offsetTable 中删除这个消息队列的消费点位信息既然是新分配的同时又不在 processQueueTable 中说明这个队列里面的消息需要重新拉取来处理了于是先把原来的偏移量删掉。接着创建新的 ProcessQueue然后设置下一次要从这个队列的哪个位置开始拉取消息pull 模式是用户自己决定, 返回 0, push 则是根据 consumeFromWhere 计算默认是 CONSUME_FROM_LAST_OFFSET也就是从上此消费的位置继续消费。
最后将当前 MessageQueue 和 ProcessQueue 存储到 processQueueTable 集合中注意这里是不存在才存入新增成功之后新建一个 PullRequestPullRequest 就是消息拉取请求创建 PullRequest 之后设置下里面的拉取属性
消费者组下一次要拉取的消息的起始偏移量要拉取的消息队列消息队列对应的处理队列
最后分发 pullRequestList意思是把 pullRequestList 丢到消息拉取线程池中执行去拉取消息而 PULL 模式下拉取消息是消费者决定的所以 PULL 类型的消费者的实现都是空实现PUSH 模式下则是消费者自己去拉取消息来消费所以这些拉取请求会被 PullMessageService 服务去处理。 5.1 removeUnnecessaryMessageQueue 删除不需要的消息队列
这里我们只看 PUSH 模式的实现。
/*** 删除不必要的消息队列* param mq* param pq* return*/
Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {// 首先持久化消息队列的偏移量到 broker, 实际上 persist 有本地和 broker 两种实现, 一般都是上报消费点位到 broker, 这里的消费点// 位意思是当前消费者已消费的消息在 ConsumeQueue 中的索引 1, 也就是下一条要消费的消息的索引位置this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);// 因为这个消息队列不属于这个消费者了, 所以需要接着把这个消费者的 offset 信息从本地缓存 offsetTable 中删掉this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);// 如果当前消费者是顺序消费且是集群模式, 就尝试请求 broker 解除消息队列锁, 并发消费或者广播模式就没必要了if (this.defaultMQPushConsumerImpl.isConsumeOrderly() MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {// 首先获取下处理队列的 consumeLock 锁, 这个锁在顺序消费消费者消费消息之前会获取, 这里要请求 broker 解除远端锁之前需// 要获取下本地锁, 获取时间是 100ms, 如果获取失败就意味着这个消息队列已经分配给其他消费者了, 但是当前顺序消费的消费者正// 在消费, 这种情况下就退出这个方法, 等待下次负载均衡之后再次请求 broker 解锁了if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {try {// 这里就是获取本地锁 consumeLock 成功了, 请求 broker 去解除消息队列锁return this.unlockDelay(mq, pq);} finally {// 最后解除 consumeLock 锁pq.getConsumeLock().unlock();}} else {// 打印下日志log.warn([WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {},mq,pq.getTryUnlockTimes());// 这就是上面说的情况, 当前队列正在被消费者消费, 尝试解锁次数 1pq.incTryUnlockTimes();}} catch (Exception e) {log.error(removeUnnecessaryMessageQueue Exception, e);}// 这里就是解锁失败, 返回 false, 移除失败return false;}// 解锁成功, 移除成功return true;
}传入的队列就是要删除的队列首先要做的就是持久化消息队列的偏移量到 broker实际上 persist 有本地和 broker 两种实现一般都是上报消费点位到 broker这里的消费点位意思是当前消费者已消费的消息在 ConsumeQueue 中的索引 1也就是下一条要消费的消息的索引位置持久化的用意就是当这个队列分配给其他消费者的时候消费者可以从这个队列上一次消费的位置开始继续拉取消息避免消息重复消费。
同时因为这个消息队列不属于这个消费者了所以需要接着把这个消费者的 offset 信息从本地缓存 offsetTable 中删掉最后如果当前消费者是顺序消费且是集群模式就尝试请求 broker 解除消息队列锁并发消费或者广播模式就没必要了。不过要注意下这里如果需要请求 broker 解除消息队列锁需要下面几个步骤
加本地锁 consumeLock解除 broker 队列锁解除本地锁 consumeLock 5.2 removeDirtyOffset 删除偏移量
消费者重平衡之后对于不再负责的消息队列需要将这些队列偏移量从本地缓存 offsetTable 中删掉。
Override
public void removeDirtyOffset(final MessageQueue mq) {this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
}/*** RemoteBrokerOffsetStore 的实现, 删除本地缓存* param mq*/
public void removeOffset(MessageQueue mq) {// 从本地缓存中删掉这个消息队列的偏移量if (mq ! null) {this.offsetTable.remove(mq);log.info(remove unnecessary messageQueue offset. group{}, mq{}, offsetTableSize{}, this.groupName, mq,offsetTable.size());}
}5.3 dispatchPullRequest 提交消息拉取请求
/*** 分发拉取消息的请求* param pullRequestList*/
Override
public void dispatchPullRequest(ListPullRequest pullRequestList) {for (PullRequest pullRequest : pullRequestList) {this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info(doRebalance, {}, add a new pull request {}, consumerGroup, pullRequest);}
}对于新增的队列通过这个分发提交消息拉取请求到线程池去拉取消息。 6. messageQueueChanged 更新消息队列信息
前面也说过当消息队列发生了变更后面会调用这个方法去更新消息队列下面的一些参数比如消费者消息拉取的时候会有一个 topic 流控参数来限制这个消费者拉取并缓存到本地的消息数平均分配到每一个队列的数量就是 [这个流控参数 / 队列总数]所以当队列信息发生变更就需要调用这个方法去更新。
/*** 当消费队列发生变化, 这里会去更新本地的一些信息, 如订阅关系版本号、流控参数、发送心跳...* param topic topic 名称* param mqAll 这个 topic 下面的所有消息队列* param mqDivided 分配给当前消费者的消息队列*/
Override
public void messageQueueChanged(String topic, SetMessageQueue mqAll, SetMessageQueue mqDivided) {/*** When rebalance result changed, should update subscriptions version to notify broker.* Fix: inconsistency subscription may lead to consumer miss messages.** 1. 当消费者重平衡的时候分配的消费队列发生变化了, 这种情况下需要通知 broker 消费者消费队列发生更改了, 而 broker 是根据订阅信息* 版本来判断订阅信息是否发生变化了的, 所以这里会更新下当前消费者存储的版本*/// 获取消费者的订阅信息SubscriptionData subscriptionData this.subscriptionInner.get(topic);// 设置订阅信息的版本为// 新的版本long newVersion System.currentTimeMillis();log.info({} Rebalance changed, also update version: {}, {}, topic, subscriptionData.getSubVersion(), newVersion);subscriptionData.setSubVersion(newVersion);// 2. 重新设置 topic 的流控阈值, 包括消息总数和消息总大小// 获取当前消费者的处理队列的总数int currentQueueCount this.processQueueTable.size();if (currentQueueCount ! 0) {// 重新计算下 topic 流控阈值, 因为消息队列总数变化了, 所以这个消费者能拉取并缓存到本地的的 topic 消息数也应该发生变化才对int pullThresholdForTopic this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();// 如果是 -1 代表没有限制, 默认就是 -1if (pullThresholdForTopic ! -1) {// 如果不是 -1, 设置新的值为[原来的 / 当前总队列数]也就是 pullThresholdForTopic / currentQueueCountint newVal Math.max(1, pullThresholdForTopic / currentQueueCount);log.info(The pullThresholdForQueue is changed from {} to {},this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);// 设置新的 topic 流控阈值this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);}// topic 级别的流控阈值大小 (MB)int pullThresholdSizeForTopic this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();// 如果不等于 -1, 就是设置了流控阈值大小if (pullThresholdSizeForTopic ! -1) {// 取值为 pullThresholdSizeForTopic / currentQueueCountint newVal Math.max(1, pullThresholdSizeForTopic / currentQueueCount);log.info(The pullThresholdSizeForQueue is changed from {} to {},this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);// 重新设置阈值this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);}}// 3. 发送心跳信息给 broker// notify brokerthis.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}方法中首先获取下 topic 订阅消息然后更新下订阅信息的版本这样最后调用 sendHeartbeatToAllBrokerWithLock 发送心跳给 broker 的时候broker 就会更新消费者订阅信息了。
然后就是设置流控阈值首先就是 currentQueueCount这个参数代表了当前消费者负责的消息队列数量然后获取下 pullThresholdForTopic 这个参数这个参数就是从 topic 层面进行流控所谓主题层面的流控就是对消费者从整个主题中拉取并缓存的消息总量进行限制。其默认值为 -1意味着不做限制即消费者可以无限制地从该主题中拉取消息并缓存若 pullThresholdForTopic 被设置为一个非 -1 的有效数值也就是启用了主题级别的流控那么之前设置的 pullThresholdForQueue队列级别的流控阈值会被覆盖新的 pullThresholdForQueue 值会根据 pullThresholdForTopic 的值和分配给该消费者的消息队列数量重新计算得出。
计算方式就是Math.max(1, pullThresholdForTopic / currentQueueCount)
当然除了消息条数还有消息的大小限制也就是 pullThresholdSizeForTopic单位是 MB逻辑跟上面的 pullThresholdForTopic 都是一样的。
最后发送心跳信息给 broker更新 broker 里面的心跳信息。 7. 小结
好了这篇文章我们讲述了消费者重平衡的一部分源码还有剩下一点就是重平衡策略就留到下一篇文章再来探讨。 如有错误欢迎指出