暖通毕业设计代做网站,新产品的推广销售方法,网站源码检测,wordpress子页面不显示不出来一、带标签的Tag消息
1.1、概述 RocketMQ提供消息过滤的功能#xff0c;通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候#xff0c;根据业务逻辑可能需要区分#xff0c;比如带有tagA标签的消息被消费者A消费#xff0c;带有tagB标签的消息被消费者B消费…一、带标签的Tag消息
1.1、概述 RocketMQ提供消息过滤的功能通过Tag或者Key进行区分。我们往一个主题里面发送消息的时候根据业务逻辑可能需要区分比如带有tagA标签的消息被消费者A消费带有tagB标签的消息被消费者B消费还有在事务监听的类里面只要是事务消息都要走同一个监听我们也需要通过过滤才能区别对待。
1.2、什么时候该用Topic什么时候该用Tag 不同的业务应该使用不同的Topic如果仅仅是相同的业务里边有不同的表现形式那么我们要使用Tag进行区分。至于说具体怎么选择可以从以下几个方面进行区分
1消息类型是否一致如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic无法通过Tag进行区分
2业务是否相关联没有直接关联的消息如淘宝交易信息、京东物流消息使用不同的Topic进行区分而同样是淘宝交易消息电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分
3消息优先级是否一致如同样是物流消息盒马必须2小时内送达天猫超市24小时内送达淘宝物流则相对会慢一些不同优先级的消息用不同的Topic进行区分
4消息量级是否相当有些业务消息虽然量小但是实时性要求高如果跟某些万亿量级别的消息使用同一个Topic则有可能会因为过长的等待时间而饿死此时需要将不同量级的消息进行区分使用不同的Topic 总的来说针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息例如全集和子集的关系流程先后的关系。
二、案例代码
2.1、pom 同案例五
2.2、RocketMQConstant 同案例五
2.3、消费者
2.3.1、TagConsumer1
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 10:33* Description: Tag消息消费者*/
Slf4j
public class TagConsumer1 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(TagConsumer1Group);consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe(TagTopic,NBA);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info(消费者[TagConsumer1]收到消息消息详情{}, StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(消费者[TagConsumer1] start success);}}2.3.2、TagConsumer2
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 10:33* Description: Tag消息消费者*/
Slf4j
public class TagConsumer2 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(TagConsumer2Group);consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe(TagTopic,RUN);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info(消费者[TagConsumer2]收到消息消息详情{}, StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(消费者[TagConsumer2] start success);}}2.3.3、TagConsumer3
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 10:33* Description: Tag消息消费者*/
Slf4j
public class TagConsumer3 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(TagConsumer3Group);consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe(TagTopic,STAR || CAR);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info(消费者[TagConsumer3]收到消息消息详情{}, StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(消费者[TagConsumer3] start success);}}2.3.4、TagConsumer4
package org.star.tag.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 10:33* Description: Tag消息消费者*/
Slf4j
public class TagConsumer4 {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(TagConsumer4Group);consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe(TagTopic,CAR || MOBILE || TOURISM);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {log.info(消费者[TagConsumer4]收到消息消息详情{}, StrUtil.utf8Str(list.get(0).getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(消费者[TagConsumer4] start success);}}2.4、TagProducer
package org.star.tag.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 10:22* Description: Tag消息生产者*/
Slf4j
public class TagProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(TagProducer);producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info(Tag消息生产者 start success);String[] tags new String[]{NBA, RUN, STAR,CAR,MOBILE,TOURISM};for (int i 0; i 6; i) {String tag tags[i % tags.length];String content ;if (NBA.equals(tag)) {content NBA message消息编号[ i ];} else if (RUN.equals(tag)) {content RUN message消息编号[ i ];} else if (STAR.equals(tag)) {content STAR message消息编号[ i ];} else if (CAR.equals(tag)) {content CAR message消息编号[ i ];} else if (MOBILE.equals(tag)) {content MOBILE message消息编号[ i ];} else if (TOURISM.equals(tag)) {content TOURISM message消息编号[ i ];}log.info(当前tag{}消息内容{}, tag, content);Message message new Message(TagTopic, tag, content.getBytes(StandardCharsets.UTF_8));SendResult result producer.send(message);log.info(sendStatus:{},brokerName:{},queueId:{},msgId:{}, result.getSendStatus(), result.getMessageQueue().getBrokerName(), result.getMessageQueue().getQueueId(), result.getMsgId());}producer.shutdown();}}2.5 、控制台打印结果
# 生产者
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - Tag消息生产者 start success
09:53:44.850 [main] INFO org.star.tag.producer.TagProducer - 当前tagNBA消息内容NBA message消息编号[0]
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D530000
09:53:45.308 [main] INFO org.star.tag.producer.TagProducer - 当前tagRUN消息内容RUN message消息编号[1]
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D5C0001
09:53:45.315 [main] INFO org.star.tag.producer.TagProducer - 当前tagSTAR消息内容STAR message消息编号[2]
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:2,msgId:0AA867618F8018B4AAC2262C1D640002
09:53:45.319 [main] INFO org.star.tag.producer.TagProducer - 当前tagCAR消息内容CAR message消息编号[3]
09:53:45.322 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:3,msgId:0AA867618F8018B4AAC2262C1D680003
09:53:45.323 [main] INFO org.star.tag.producer.TagProducer - 当前tagMOBILE消息内容MOBILE message消息编号[4]
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:0,msgId:0AA867618F8018B4AAC2262C1D6B0004
09:53:45.326 [main] INFO org.star.tag.producer.TagProducer - 当前tagTOURISM消息内容TOURISM message消息编号[5]
09:53:45.329 [main] INFO org.star.tag.producer.TagProducer - sendStatus:SEND_OK,brokerName:broker-a,queueId:1,msgId:0AA867618F8018B4AAC2262C1D6E0005# 消费者TagConsumer1
09:53:45.310 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer1 - 消费者[TagConsumer1]收到消息消息详情NBA message消息编号[0]# 消费者TagConsumer2
09:53:45.316 [ConsumeMessageThread_2] INFO org.star.tag.consumer.TagConsumer2 - 消费者[TagConsumer2]收到消息消息详情RUN message消息编号[1]# 消费者TagConsumer3
09:53:45.322 [ConsumeMessageThread_3] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息消息详情STAR message消息编号[2]
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer3 - 消费者[TagConsumer3]收到消息消息详情CAR message消息编号[3]# 消费者TagConsumer4
09:53:45.327 [ConsumeMessageThread_4] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息消息详情CAR message消息编号[3]
09:53:45.344 [ConsumeMessageThread_6] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息消息详情MOBILE message消息编号[4]
09:53:45.344 [ConsumeMessageThread_5] INFO org.star.tag.consumer.TagConsumer4 - 消费者[TagConsumer4]收到消息消息详情TOURISM message消息编号[5]