微机课做网站,建设部网站官网 下载规范,受欢迎的合肥网站建设,松门建设规划局网站1、 基础概念
RocketMQ 支持两种消息模式#xff1a;集群消费#xff08; Clustering #xff09;和广播消费#xff08; Broadcasting #xff09;。
集群消费模式#xff08;Cluster#xff09;#xff1a; 在集群消费模式下#xff0c;同一个消费者组#xff08…1、 基础概念
RocketMQ 支持两种消息模式集群消费 Clustering 和广播消费 Broadcasting 。
集群消费模式Cluster 在集群消费模式下同一个消费者组Consumer Group中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上但是同一个消息只会被同一个消费者组中的一个消费者消费。
广播消费模式Broadcast 在广播消费模式下同一个消费者组中的每个消费者都会收到消息的一个副本即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。
怎么使用广播消费模式呢其实很简单通过在消费者的 RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING即可将消费者设置为广播模式。在广播模式下同一个消费者组中的每个消费者都会收到消息的一个副本每个消费者都会独立地消费消息从而实现了消息的广播消费。
2、 实现
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 广播模式*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//根据情况修改消费者组DefaultMQPushConsumer consumer new DefaultMQPushConsumer(defaultGroup);consumer.setNamesrvAddr(127.0.0.1:9876);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//设置setMessageModel(MessageModel.BROADCASTING) 即可设置成广播模式//此时你发送的消息会在所有的Consumer都会收到,而不会只往一个组里面的一个消费者去消费/**这里可以设置两种模式: 默认都是CLUSTERING(CLUSTERING)* BROADCASTING(BROADCASTING) 广播模式* CLUSTERING(CLUSTERING) 集群模式*/consumer.setMessageModel(MessageModel.BROADCASTING);//根据情况修改消费的topicconsumer.subscribe(TopicTest, *);consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf(Broadcast Consumer Started.%n);}
}生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(defaultGroup);//NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定mq的地址producer.setNamesrvAddr(127.0.0.1:9876);producer.start();try {{Message msg new Message(TopicTest, // 发送的topicAAA, //tagsBBB, // keysCCC.getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容);//同步传递消息消息会发给集群中的一个Broker节点。//这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的//不知道消息是否发送成功,反正Producer发送完了就不管了 .producer.sendOneway(msg);}} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}