广州番禺网站制作推广,网站移动端做pc端的301跳转,沪尚茗居全包价格,做t恤网站 一件也可以做文章目录 一、顺序消息二、顺序消息消费过程1、消息队列负载2、消息拉取3、消息消费4、消息进度存储 三、总结 一、顺序消息 RocketMq在同一个队列中可以保证消息被顺序消费#xff0c;所以如果要做到消息顺序消费#xff0c;可以将消费主题#xff08;topic#xff09;设置… 文章目录 一、顺序消息二、顺序消息消费过程1、消息队列负载2、消息拉取3、消息消费4、消息进度存储 三、总结 一、顺序消息 RocketMq在同一个队列中可以保证消息被顺序消费所以如果要做到消息顺序消费可以将消费主题topic设置成一个队列。
二、顺序消息消费过程 同普通消息一样顺序消息消费需要经历4个步骤消息队列负载、消息拉取、消息消费、消息进度存储。
1、消息队列负载 消息队列负载由RebalanceService线程实现每隔20s对消息队列重新负载。
//org.apache.rocketmq.client.impl.consumer.RebalanceService#runOverridepublic void run() {log.info(this.getServiceName() service started);//20s重新负载一次while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() service end);}在集群模式下同一个消费组内的消费者共同承担其订阅topic的消息队列的消费同一个消息队列在同一时刻只能被组内一个消费者消费一个消费者同一时刻可以分配多个消费队列 在经过消息队列负载时会创建新的消息拉取任务PullRequest如果判断是顺序消息消费isOrdertrue时需要向broker发起锁定该消息队列的请求this.lock(mq)如果锁定失败则跳过在下一次负载时再尝试加锁。
//org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalanceprivate boolean updateProcessQueueTableInRebalance(final String topic, final SetMessageQueue mqSet,final boolean isOrder) {boolean changed false;//遍历原来的缓存列表IteratorEntryMessageQueue, ProcessQueue it this.processQueueTable.entrySet().iterator();while (it.hasNext()) {EntryMessageQueue, ProcessQueue next it.next();MessageQueue mq next.getKey();ProcessQueue pq next.getValue();if (mq.getTopic().equals(topic)) {//新的结果中不包含原来缓存中的队列停止消费if (!mqSet.contains(mq)) {pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed true;log.info(doRebalance, {}, remove unnecessary mq, {}, consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed true;log.error([BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it,consumerGroup, mq);}break;default:break;}}}}//启动新增加的消费队列ListPullRequest pullRequestList new ArrayListPullRequest();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder !this.lock(mq)) {log.warn(doRebalance, {}, add a new mq failed, {}, because lock failed, consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq new ProcessQueue();long nextOffset -1L;try {nextOffset this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info(doRebalance, {}, compute offset failed, {}, consumerGroup, mq);continue;}if (nextOffset 0) {ProcessQueue pre this.processQueueTable.putIfAbsent(mq, pq);if (pre ! null) {log.info(doRebalance, {}, mq already exists, {}, consumerGroup, mq);} else {//创建新的PullRequest并加入到pullRequestService中log.info(doRebalance, {}, add a new mq, {}, consumerGroup, mq);PullRequest pullRequest new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed true;}} else {log.warn(doRebalance, {}, add new mq failed, {}, consumerGroup, mq);}}}this.dispatchPullRequest(pullRequestList);return changed;}2、消息拉取 消息拉取由PullMessageService线程处理当有消息拉取请求进来时马上处理拉取消息否则阻塞等待拉取请求。
//org.apache.rocketmq.client.impl.consumer.PullMessageService#runOverridepublic void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {try {//取一个PullRequest消息拉取任务如果pullRequestQueue为空则阻塞PullRequest pullRequest this.pullRequestQueue.take();//拉取消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error(Pull Message Service Run Method exception, e);}}log.info(this.getServiceName() service end);}
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer ! null) {//强转换成PUSH消息消费服务然后消费消息DefaultMQPushConsumerImpl impl (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn(No matched consumer for the PullRequest {}, drop it, pullRequest);}}处理消息拉取任务使用DefaultMQPushConsumerImpl实例。消息队列处于加锁状态时处理消息拉取请求否则将拉取请求放入延迟队列延迟3s再处理
//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessagepublic void pullMessage(final PullRequest pullRequest) {//....代码省略if (!this.consumeOrderly) {//非顺序性消费//....代码省略} else {//顺序性消费if (processQueue.isLocked()) {//如果消息处理队列 锁住状态if (!pullRequest.isPreviouslyLocked()) {long offset -1L;try {//从负载中计算当前已拉取的消息进度偏移量offset this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error(Failed to compute pull offset, pullResult: {}, pullRequest, e);return;}//pullRequest下条消息偏移量比较大说明当前繁忙boolean brokerBusy offset pullRequest.getNextOffset();log.info(the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {},pullRequest, offset, brokerBusy);if (brokerBusy) {log.info([NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {},pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}} else {//消息处理队列非锁住状态则延迟3s再加入拉取队列this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info(pull message later because not locked in broker, {}, pullRequest);return;}}//...代码省略}3、消息消费 消息消费由实现类ConsumeMessageOrderlyService处理构造方法如下:
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#ConsumeMessageOrderlyServicepublic ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerOrderly messageListener) {//消息消费实现this.defaultMQPushConsumerImpl defaultMQPushConsumerImpl;//顺序消息消费监听器this.messageListener messageListener;//消息消费者this.defaultMQPushConsumer this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();//组名this.consumerGroup this.defaultMQPushConsumer.getConsumerGroup();//消费任务队列this.consumeRequestQueue new LinkedBlockingQueueRunnable();String consumeThreadPrefix null;if (consumerGroup.length() 100) {consumeThreadPrefix new StringBuilder(ConsumeMessageThread_).append(consumerGroup.substring(0, 100)).append(_).toString();} else {consumeThreadPrefix new StringBuilder(ConsumeMessageThread_).append(consumerGroup).append(_).toString();}//消费线程池this.consumeExecutor new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl(consumeThreadPrefix));//任务调度线程池this.scheduledExecutorService Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(ConsumeMessageScheduledThread_));}启动时如果是集群模式则默认每隔20sProcessQueue.REBALANCE_LOCK_INTERVAL执行一次锁定分配给自己的消费队列只有标记队列锁定成功消息拉取任务才可以执行。
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#startpublic void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {ConsumeMessageOrderlyService.this.lockMQPeriodically();} catch (Throwable e) {log.error(scheduleAtFixedRate lockMQPeriodically exception, e);}}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}}上述代码中lockMQPeriodically()最终调用RebalanceImpl#lockAll方法执行队列加锁同时也包含对列解锁具体步骤如下
1、获取所有broker中的消息队列brokerMqs然后逐个遍历2、根据brokerName找到主节点findBrokerResult向该节点发送锁定消息队列请求this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ得到锁定成功的队列集合lockOKMQSet3、遍历锁定成功的队列集合lockOKMQSet将队列加锁状态以及加锁时间更新到本地队列缓存表this.processQueueTable中4、其余不在锁定队列集合lockOKMQSet中的队列设置加锁状态为false即解锁。这将会暂定对该消息队列的消息拉取以及消费 public void lockAll() {//构建消息处理队列HashMapString, SetMessageQueue brokerMqs this.buildProcessQueueTableByBrokerName();IteratorEntryString, SetMessageQueue it brokerMqs.entrySet().iterator();while (it.hasNext()) {EntryString, SetMessageQueue entry it.next();final String brokerName entry.getKey();final SetMessageQueue mqs entry.getValue();if (mqs.isEmpty())continue;//从订阅信息查找broker主节点FindBrokerResult findBrokerResult this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult ! null) {LockBatchRequestBody requestBody new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {//向主broker发送锁定消息队列返回成功被当前消费者锁定的队列SetMessageQueue lockOKMQSet this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue this.processQueueTable.get(mq);if (processQueue ! null) {if (!processQueue.isLocked()) {log.info(the message queue locked OK, Group: {} {}, this.consumerGroup, mq);}//将返回回来的队列设置为锁定状态同时更新加锁时间processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {//如果返回的锁定队列不包含这个队列则设置加锁状态为falseProcessQueue processQueue this.processQueueTable.get(mq);if (processQueue ! null) {processQueue.setLocked(false);log.warn(the message queue locked Failed, Group: {} {}, this.consumerGroup, mq);}}}} catch (Exception e) {log.error(lockBatchMQ exception, mqs, e);}}}}在消息拉取任务完成拉取后回调处理触发消息消费DefaultMQPushConsumerImpl#pullMessage中PullCallback代码此时创建顺序消息消费任务ConsumeRequest 并放入线程池中。
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequestOverridepublic void submitConsumeRequest(final ListMessageExt msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}4、消息进度存储 消息消费任务线程ConsumeRequest 消费完成后更改消息消费进度。值得注意的是在上述提到消息拉取后提交消费任务而消费任务并不止是消费本次拉取的消息而是消费队列中所有的未处理消息。
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#runOverridepublic void run() {if (this.processQueue.isDropped()) {log.warn(run, the message queue not be able to consume, because its dropped. {}, this.messageQueue);return;}final Object objLock messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() !this.processQueue.isLockExpired())) {//广播模式直接消费集群模式process必须被锁定并且未超时final long beginTime System.currentTimeMillis();for (boolean continueConsume true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn(the message queue not be able to consume, because its dropped. {}, this.messageQueue);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) !this.processQueue.isLocked()) {log.warn(the message queue not locked, so consume later, {}, this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) this.processQueue.isLockExpired()) {log.warn(the message queue lock expired, so consume later, {}, this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}long interval System.currentTimeMillis() - beginTime;if (interval MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}final int consumeBatchSize ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//按顺序取出consumeBatchSize条消息ListMessageExt msgs this.processQueue.takeMessages(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status null;ConsumeMessageContext consumeMessageContext null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {//执行钩子函数consumeMessageContext new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMapString, String());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp System.currentTimeMillis();ConsumeReturnType returnType ConsumeReturnType.SUCCESS;boolean hasException false;try {//申请消息消费锁this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {log.warn(consumeMessage, the message queue not be able to consume, because its dropped. {},this.messageQueue);break;}status messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format(consumeMessage exception: %s Group: %s Msgs: %s MQ: %s,RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException true;} finally {this.processQueue.getConsumeLock().unlock();}if (null status|| ConsumeOrderlyStatus.ROLLBACK status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT status) {log.warn(consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {},ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT System.currentTimeMillis() - beginTimestamp;if (null status) {if (hasException) {returnType ConsumeReturnType.EXCEPTION;} else {returnType ConsumeReturnType.RETURNNULL;}} else if (consumeRT defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT status) {returnType ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS status) {returnType ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null status) {status ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS status || ConsumeOrderlyStatus.COMMIT status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);continueConsume ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume false;}}} else {if (this.processQueue.isDropped()) {log.warn(the message queue not be able to consume, because its dropped. {}, this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}通过ConsumeRequest#run方法可以看出顺序消费任务有如下处理步骤
1、获取消息队列的锁对象加锁成功后synchronized (objLock)才可进行消费。这里是对整个消息队列加锁也说明同一时刻队列只会被一个线程消费2、如果是广播模式直接进入消费集群模式则需要进行判断如果消息队列已被锁定并且锁未超时进入消费否则将消息队列放入任务调度线程池中延时100ms处理3、进入循环消费当处理队列processQueue标示为丢弃则结束本次消费当处于集群模式下处理队列没有加锁或者加锁超时距离上一次加锁时间超过阈值REBALANCE_LOCK_MAX_LIVE_TIME默认30s延时10ms处理结束本次任务当消费时长interval超过阈值MAX_TIME_CONSUME_CONTINUOUSLY默认60s延时10ms处理结束本次任务4、按顺序取出consumeBatchSize默认1条消息如果没有消息则结束循环消费结束本次消费任务5、执行消息前置钩子函数6、申请消息消费锁调用消息监听器执行业务具体消费逻辑获取消费结果status7、根据消费结果status计算消费返回结果returnType如果存在钩子函数则将返回结果存入供后置钩子函数使用8、处理消费结果获取消息待更新进度并更新获取是否继续消费结果continueConsume
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#processConsumeResultif (commitOffset 0 !consumeRequest.getProcessQueue().isDropped()) {//更新消息消费进度this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}三、总结
1、消息顺序消费需要将消费主题topic设置成一个队列 2、顺序消息消费需要经历4个步骤消息队列负载、消息拉取、消息消费、消息进度存储