当前位置: 首页 > news >正文

网站下面版权代码wordpress栏目标题被调用

网站下面版权代码,wordpress栏目标题被调用,电商网站支付方案,宁波关键词优化品牌文章目录 一、消息队列简介1.1 什么是消息队列1.2 常见消息队列对比1.3 RockectMQ 核心概念1.4 RockectMQ 工作机制 #xff08;★#xff09; 二、RocketMQ 部署相关2.1 服务器单机部署2.2 管控台页面 三、RocketMQ 的基本使用3.1 入门案例3.2 消息发送方式3.2.1 同步消息3.… 文章目录 一、消息队列简介1.1 什么是消息队列1.2 常见消息队列对比1.3 RockectMQ 核心概念1.4 RockectMQ 工作机制 ★ 二、RocketMQ 部署相关2.1 服务器单机部署2.2 管控台页面 三、RocketMQ 的基本使用3.1 入门案例3.2 消息发送方式3.2.1 同步消息3.2.2 异步消息3.2.3 一次性消息 3.3 消息消费方式3.3.1 集群模式3.3.2 广播模式 3.4 顺序消息3.5 延迟消息3.6 消息过滤3.6.1 Tag 过滤3.6.2 SQL92 过滤 四、SpringBoot 集成 RocketMQ4.1 入门案例4.2 消息发送方式4.2.1 同步消息4.2.2 异步消息4.2.3 一次性消息 4.3 消息消费方式4.3.1 集群模式4.3.2 广播模式 4.4 顺序消息4.5 延时消息4.6 消息过滤4.6.1 Tag 过滤4.6.2 SQL92 过滤 一、消息队列简介 1.1 什么是消息队列 消息队列MQ也叫消息队列中间件其主要通过消息的发送和接受来实现程序的异步解耦、削峰填谷以及数据分发但是 MQ 真正的目的是为了通讯。他屏蔽了复杂的通讯协议像常用的 dubbo、http 协议都是同步的。这两种协议很难实现双端通讯即A调用BB也可以主动调用A而且不支持长链接。MQ 做的就是在这些协议上构建一个简单协议——生产者、消费者模型MQ 带给我们的不是底层的通讯协议而是更高层次的通讯模型。他定义了两个对象发送数据的叫做生产者接受消息的叫做消费者我们可以无视底层的通讯协议并且可以自己定义生产者消费者。 参考消息队列详解 1.2 常见消息队列对比 1.3 RockectMQ 核心概念 生产者 Producer负责生产消息一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息单向发送不需要。 消费者 Consumer负责消费消息一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式拉取式消费、推动式消费。 名字服务 Name Server名称服务充当路由消息的提供者生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个Namesrv 实例组成集群但相互独立没有信息交换。 代理服务器 Broker Server消息中转角色负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据包括消费者组、消费进度偏移和主题和队列消息等。 消息主题 Topic表示一类消息的集合每个主题包含若干条消息每条消息只能属于一个主题是 RocketMQ 进行消息订阅的基本单位。 消息队列 MessageQueue对于每个 Topic 都可以设置一定数量的消息队列用来进行数据的读取。 消息内容 Message消息系统所传输信息的物理载体生产和消费数据的最小单位每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。 标签 Tag为消息设置的标志用于同一主题 Topic 下区分不同类型的消息。来自同一业务单元的消息可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑实现更好的扩展性。 1.4 RockectMQ 工作机制 ★ RockectMQ 有自己的注册中心即 NameServer连接命名服务后会拉取代理服务器的列表到本地缓存生产者会通过负载均衡选出代理服务器的具体 IP然后向选出的代理服务器发送消息最终发送给消费者进行消费。如果发送的消息含有标签Tag那么会在消费者消费时进行消息的过滤。   其中每个 Topic 默认有 4 个 MessageQueue即 4 个写和读队列。在消息中间件每个 topic 设置 4 个队列主要是为了解决并发性能的问题。如果只有一个队列为保证线程安全必须得给队列进行写操作时上锁。设置 4 个队列也是由于大部分的服务器核心数都是 4 核的。 二、RocketMQ 部署相关 2.1 服务器单机部署 搜索资源rocketmq-all-4.4.0-bin-release.zip ① 将压缩包上传服务器,把rocketmq-all-4.4.0-bin-release.zip 拷贝到 /usr/local/software ② 使用解压命令进行解压到 /usr/local 目录 unzip /usr/local/software/rocketmq-all-4.4.0-bin-release.zip -d /usr/local③ 软件文件名重命名 mv /usr/local/rocketmq-all-4.4.0-bin-release/ /usr/local/rocketmq-4.4/④ 设置环境变量 vi /etc/profileexport JAVA_HOME/usr/local/jdk1.8 export ROCKETMQ_HOME/usr/local/rocketmq-4.4 export PATH$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH修改环境变量后需要 source /etc/profile 使配置文件生效。 ⑤ 修改脚本中的 JVM 相关参数和启动参数的配置 vi /usr/local/rocketmq-4.4/bin/runbroker.shJAVA_OPT${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512mvi /usr/local/rocketmq-4.4/bin/runserver.shJAVA_OPT${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize128m -XX:MaxMetaspaceSize320m⑥ 修改配置文件设置 Nameserver 和 Broker-server 部署机器的IP地址。 vi /usr/local/rocketmq-4.4/conf/broker.conf注如果是服务器本身可以不设置 ⑦ 启动 NameServer # 1.启动NameServer 代表后台输出 nohup sh mqnamesrv # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/namesrv.log⑧ 启动 Broker #1.启动Broker # nohup sh mqbroker -n 部署的IP地址:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf nohup sh mqbroker -n localhost:9876 -c /usr/local/rocketmq-4.4/conf/broker.conf #2.查看启动日志 tail -f ~/logs/rocketmqlogs/broker.log ⑨ 使用 jps命令查看是否开启成功如果看到NamesrvStartup和BrokerStartup这两个进程则证明启动成功。 关闭 nameserversh mqshutdown namesrv 关闭 brokersh mqshutdown broker 另外服务器需要暂时关闭防火墙 systemctl stop firewalld并可使用 firewall-cmd --state 查看防火墙状态。 具体可参考Linux关闭防火墙命令 2.2 管控台页面 搜索资源rocketmq-console-ng-1.0.1.jar 在 jar 包的文件夹下新建一个配置文件 application.properties编辑管控台的端口和 NameServer 中心的 IP 地址使用 java -jar rocketmq-console-ng-1.0.1.jar 启动即可。 访问 http://localhost:9999/#/管控台界面如下 注管控台要求 jdk1.8。 三、RocketMQ 的基本使用 在一个工程中创建两个模块模拟生产者和消费者 3.1 入门案例 添加 rocketmq 的 pom 依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.4.0/version /dependency生产者模块生产消息 public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.101:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic helloTopic;// 发送消息for (int i 0; i 3; i) {Message message new Message(topic, (RocketMQ普通消息 i).getBytes(Charset.defaultCharset()));// 发送完成之后会返回响应结果SendResult result producer.send(message);System.out.println(发送状态 result.getSendStatus());}System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }消费者模块消费消息 public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(helloConsumerGroup);// 设置nameSever地址consumer.setNamesrvAddr(192.168.63.101:9876);// 设置订阅的主题consumer.subscribe(helloTopic, *); // * 消息不过滤// 设置消费模式默认集群// consumer.setMessageModel(MessageModel.CLUSTERING);// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {String content new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程 Thread.currentThread() 消息内容 content);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 通知MQ消费正常// return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 通知MQ消费失败消费者会重新消费}});// 启动的消费者consumer.start();} }3.2 消息发送方式 3.2.1 同步消息 可靠性同步地发送方式使用的比较广泛比如重要的消息通知短信通知。上面演示的案例就是同步消息发送方式。 应用程序给消息中间件发送消息的时候需要等待消息中间件将消息存储完毕后才响应回去业务代码才能往下执行。 发送方式SendResult result producer.send(msg); 上面演示的案例就是同步发送。 3.2.2 异步消息 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待 Broker 的响应。 应用程序发送消息消息中间件收到这个消息之后直接给应用程序响应此时消息并没有完全存储到磁盘消息中间件继续存储消息通过回调地址通知有应用程序存储的结果成功或失败。 发送方式 producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {...}Overridepublic void onException(Throwable throwable) {...} });public class AsynchronousProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.100:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic helloTopic;// 发送消息Message message new Message(topic, (RocketMQ异步消息).getBytes(Charset.defaultCharset()));System.out.println(消息发送前);// 异步发送需要传递异步回调消息producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(消息存储状态 sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {System.out.println(消息发送出现异常);}});System.out.println(消息发送完毕);TimeUnit.SECONDS.sleep(5); // 为了等回调消息模拟程序睡眠5s后关闭资源// 关闭资源producer.shutdown();} }3.2.3 一次性消息 一次性消息主要用在不特别关心发送结果的场景例如日志发送。 发送方式producer.sendOneway(message); 应用程序给消息中间件发送消息的时候不需要知道消息是否在消息中间存储了只管发就是了。 public class OneTimeProducer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.100:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic helloTopic;// 发送消息Message message new Message(topic, (RocketMQ一次性消息).getBytes(Charset.defaultCharset()));System.out.println(消息发送前);// 一次性消息producer.sendOneway(message);System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }3.3 消息消费方式 3.3.1 集群模式 消费者采用负载均衡方式消费消息多个消费者共同消费队列消息每个消费者处理的消息不同。 由于内部会根据 index % queue.size() 的方式来决定消息进哪个 messageQueue因此当多个机器做集群的时候也可能会发生消息消费分配不均等情况。如下面 topic 中一共有 10 个消息 入门案例默认就是集群的消费方式 3.3.2 广播模式 消费者采用广播的方式消费消息每个消费者消费的消息都是相同的。 设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING); 3.4 顺序消息 从上文的消费结果来看在集群状态下消息的消费顺序是乱序的但有些场景是要求消息的消费是有序的这要怎么实现呢我们考虑以下两个场景 ① 如果在消费者做集群的情况下由于消息会分散在不同的队列中因此消息不可保证顺序消费如第四个消息比第一个消息更早被消费。因此可以考虑将消息全放在一个队列中。 注一个队列只会被一个消费者实例消费一个消费者实例可以消费多个队列。 ② 我们设置消费者的监听模式的时候使用的是 MessageListenerConcurrently 即多线程并发消费的形式那么当消息全存储在一个队列时由于 CPU 执行权等问题消费者实例中多线程会并发的进行消费也不会保证顺序消费。 // 一个队列对应一个实例的多个线程 consumer.setMessageListener(new MessageListenerConcurrently() { // MessageListenerConcurrently 代表多线程并发消费Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {...} });因此可以使用 MessageListenerOrderly 让一个队列只对应一个线程。 // 从什么地方开始消费队头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 一个队列对应一个实例的一个线程 consumer.setMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list,ConsumeOrderlyContext consumeOrderlyContext) {...} });总结如果想要实现顺序消费在生产者的角度将消息存储在一个队列中在消费者的角度就是将消息对应消费者实例里的一个线程。 顺序消费案例生产者代码如下 public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(orderlyProducerGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.100:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic orderTopic;ListOrderStep orderSteps OrderUtil.buildOrders();// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中MessageQueueSelector selector new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {System.out.println(队列的个数 list.size()); // 队列个数4Long orderId (Long)o; // 传入的参数int index (int)(orderId % list.size());return list.get(index);}};// 发送消息for (OrderStep orderStep : orderSteps) {Message msg new Message(topic, orderStep.toString().getBytes(Charset.defaultCharset()));// 指定消息选择器传入的参数producer.send(msg, selector, orderStep.getOrderId()); // 将订单号传入选择器}System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }消费者代码如下 public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(orderlyProducerGroup);// 设置nameSever地址consumer.setNamesrvAddr(192.168.63.101:9876);// 设置订阅的主题consumer.subscribe(orderTopic, *);// 从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list,ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println(当前线程ID Thread.currentThread().getId() 队列ID msg.getQueueId() 消息内容 new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 启动的消费者consumer.start();} }模拟数据 Setter Getter public class OrderStep {private long orderId;private String desc;Overridepublic String toString() {return OrderStep{ orderId orderId , desc desc \ };} }public class OrderUtil {/*** 生成模拟订单数据*/public static ListOrderStep buildOrders() {ListOrderStep orderList new ArrayList();OrderStep orderDemo new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(102L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc(推送);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(103L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(101L);orderDemo.setDesc(完成);orderList.add(orderDemo);return orderList;} }控制台打印 从控制台打印效果来看102 订单全部存储在 ID 为 2 的队列当中并且实现了顺序消费。 3.5 延迟消息 延时消息是 RocketMQ 延时发送给消费者消费的消息典型应用场景如订单超时未支付等。其不支持任意时间的延时需要设置几个固定的延时等级从 1s 到 2h 分别对应着等级 1 到 18。 等级123456789101112131415161718延时1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h 生产者 public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.101:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic helloTopic;// 发送消息SimpleDateFormat cusFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Message message new Message(topic, (延时消息发送时间 cusFormat.format(new Date())).getBytes(Charset.defaultCharset()));// 设置消息延时级别message.setDelayTimeLevel(3);// 发送完成之后会返回响应结果SendResult result producer.send(message);System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }消费者 public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(delayConsumerGroup);// 设置nameSever地址consumer.setNamesrvAddr(192.168.63.101:9876);// 设置订阅的主题consumer.subscribe(helloTopic, *);// 设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {SimpleDateFormat cusFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);for (MessageExt msg : list) {System.out.println(消费时间 cusFormat.format(new Date()) 消息内容 new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();} }控制台打印 3.6 消息过滤 3.6.1 Tag 过滤 RocketMQ 的消息标签Message Tag是一种简单的路由机制允许消费者根据标签来过滤并只消费感兴趣的消息。要实现消息标签的过滤需要在发送消息时设置标签并在消费者端配置标签过滤器。以下示例展示如何使用的标签过滤功能 生产者设置消息标签并发送消息 public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(tagProducesGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.101:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic tagFilterTopic;// 发送消息Message message1 new Message(topic, TagA, (消息A).getBytes(Charset.defaultCharset()));Message message2 new Message(topic, TagB, (消息B).getBytes(Charset.defaultCharset()));Message message3 new Message(topic, TagC, (消息C).getBytes(Charset.defaultCharset()));producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }消费者端配置标签过滤器 public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(tagFilterConsumerGroup);// 设置nameSever地址consumer.setNamesrvAddr(192.168.63.101:9876);// 设置订阅的主题consumer.subscribe(tagFilterTopic, TagA || TagC); // 只消费 TagA 和 TagCconsumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(消息内容 new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();} }控制台打印 3.6.2 SQL92 过滤 RocketMQ 的 SQL92 过滤器是一种基于消息属性的条件筛选机制允许消费者只接收满足特定条件的消息。要使用 SQL92 过滤器需要在消费者端设置过滤条件。以下是示例演示如何在消费者端设置 SQL92 过滤器 生产者设添加属性 putUserProperty(key, value) public class Producer {public static void main(String[] args) throws Exception {// 定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(sqlProducesGroup);// 连接nameSeverproducer.setNamesrvAddr(192.168.63.100:9876);// 启动生产者producer.start();// 设置消息发送的目的地String topic sqlFilterTopic;// 发送消息Message message1 new Message(topic, TagA, (消息A).getBytes(Charset.defaultCharset()));message1.putUserProperty(age, 22);message1.putUserProperty(weight, 45);Message message2 new Message(topic, TagB, (消息B).getBytes(Charset.defaultCharset()));message2.putUserProperty(age, 30);message2.putUserProperty(weight, 50);Message message3 new Message(topic, TagC, (消息C).getBytes(Charset.defaultCharset()));message3.putUserProperty(age, 15);message3.putUserProperty(weight, 48);producer.sendOneway(message1);producer.sendOneway(message2);producer.sendOneway(message3);System.out.println(消息发送完毕);// 关闭资源producer.shutdown();} }消费者设置过滤条件 public class Consumer {public static void main(String[] args) throws MQClientException {// 定义消息消费者(在同一个JVM中消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(sqlFilterConsumerGroup);// 设置nameSever地址consumer.setNamesrvAddr(192.168.63.101:9876);// 设置订阅的主题consumer.subscribe(sqlFilterTopic, MessageSelector.bySql(age25 and weight47));// 一个队列对应的一个线程consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {System.out.println(消息内容 new String(msg.getBody(), Charset.defaultCharset()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动的消费者consumer.start();} }注意: ① 过滤条件支持以下形式 数值比较如BETWEEN 字符比较如IN IS NULL 或者 IS NOT NULL 逻辑符号 ANDORNOT 常量支持类型为 数值如**1233.1415 字符如‘abc’必须用单引号包裹起来 NULL特殊的常量 布尔值TRUE 或 FALSE ② 在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf添加参数enablePropertyFiltertrue重启 broker 代理服务器。 vi /usr/local/rocketmq-4.4/conf/broker.conf 四、SpringBoot 集成 RocketMQ 4.1 入门案例 1、添加 pom 依赖 dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.0.4/version /dependency2、生产者模块添加配置 rocketmq:name-server: 192.168.63.101:9876producer:group: my-group3、消费者模块添加配置 rocketmq:name-server: 192.168.63.101:98764、生产者模块生产消息 SpringBootTest public class RocketMQTest {Autowiredprivate RocketMQTemplate rocketMQTemplate;Testpublic void sendMsg() {MessageString msg MessageBuilder.withPayload(发送消息).build();rocketMQTemplate.send(helloTopicBoot, msg);} }注意① Message 对象是 Spring 框架提供的对象 import org.springframework.messaging.Message;    ② rocketMQTemplate.send(destination, message) 方法是同步的发送方式。 5、消费者模块消费消息 Component // 消费者名字叫 helloConsumerGroup消费的生产组叫 helloTopicBoot RocketMQMessageListener(consumerGroup helloConsumerGroup, topic helloTopicBoot, messageModel MessageModel.BROADCASTING) public class HelloTopicBootListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {System.out.println(收到消息 new String(messageExt.getBody(), Charset.defaultCharset()));} }4.2 消息发送方式 下文不阐述具体的发送细节细节参考上文。 4.2.1 同步消息 Test public void sendSYNMsg() throws InterruptedException {MessageString msg MessageBuilder.withPayload(发送同步消息).build();rocketMQTemplate.syncSend(helloTopicBoot, msg); }4.2.2 异步消息 Test public void sendASYNMsg() throws InterruptedException {MessageString msg MessageBuilder.withPayload(发送异步消息).build();rocketMQTemplate.asyncSend(helloTopicBoot, msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(发送状态 sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {System.out.println(消息发送失败);}});TimeUnit.SECONDS.sleep(5); }4.2.3 一次性消息 Test public void sendOnewayMsg() {MessageString msg MessageBuilder.withPayload(发送一次性消息).build();rocketMQTemplate.sendOneWay(helloTopicBoot, msg); }4.3 消息消费方式 这里模拟生产者发送一次性消息10次 Test public void sendOnewayMsgLoop() {for (int i 0; i 10; i) {MessageString msg MessageBuilder.withPayload(发送一次性消息 i).build();rocketMQTemplate.sendOneWay(helloTopicBoot, msg);} }4.3.1 集群模式 默认情况下消费者采用负载均衡方式消费消息即采用集群模式也可以在 RocketMQMessageListener 注解中设置 messageModel 属性来改变消费模式。 Component RocketMQMessageListener(consumerGroup helloConsumerGroup, topic helloTopicBoot,messageModel MessageModel.CLUSTERING) // 设置消费模式 public class HelloTopicBootListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {System.out.println(收到消息 new String(messageExt.getBody(), Charset.defaultCharset()));} }控制台打印 4.3.2 广播模式 设置广播模式messageModel MessageModel.BROADCASTING Component RocketMQMessageListener(consumerGroup helloConsumerGroup, topic helloTopicBoot,messageModel MessageModel.BROADCASTING) // 设置消费模式 public class HelloTopicBootListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {...} }控制台打印 4.4 顺序消息 生产者设置队列选择器需要将顺序消息放在同一个队列 Test public void sendOrderlyMsg() {// 设置队列的选择器// 将需要顺序消费的消息存储到同一个队列中rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, org.apache.rocketmq.common.message.Message message, Object o) {String orderIdStr (String)o; // 传入参数Long orderId Long.valueOf(orderIdStr);int index (int)(orderId % list.size());return list.get(index);}});ListOrderStep orderSteps OrderUtil.buildOrders();// 发送消息for (OrderStep step : orderSteps) {MessageString msg MessageBuilder.withPayload(step.toString()).build();rocketMQTemplate.sendOneWayOrderly(orderlyTopicBoot, msg, String.valueOf(step.getOrderId()));} }消费者默认一个队列是线程并发消费可以通过设置 consumeMode ConsumeMode.ORDERLY将一个消息队列对应消费者的一个线程以实现顺序消费 Component RocketMQMessageListener(consumerGroup orderlyConsumerBoot, topic orderlyTopicBoot,consumeMode ConsumeMode.ORDERLY) // 设置一个队列对应一个线程 public class OrderlyTopicListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {System.out.println(当前线程 Thread.currentThread().getId() 队列ID messageExt.getQueueId() 消息内容 new String(messageExt.getBody(), Charset.defaultCharset()));} }控制台打印 4.5 延时消息 Test public void sendDelayMsg() {SimpleDateFormat cusFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);MessageString msg MessageBuilder.withPayload(发送延时消息发送时间 cusFormat.format(new Date())).build();// 设置延时等级3这个消息将在 10s 之后发送(详看delayTimeLevel)消息在任务队列里存储10s 后发送// 3000 代表同步等待 3s若超过 3s 消息队列都没有响应自动断开链接rocketMQTemplate.syncSend(helloTopicBoot, msg, 3000, 3); }4.6 消息过滤 4.6.1 Tag 过滤 生产者生产消息Topic 和 Tag 以 “:” 分割 “:” 前后不能有空格 Test public void sendTagFilterMsg() {MessageString msg1 MessageBuilder.withPayload(消息A).build();rocketMQTemplate.send(tagFilterBoot:TagA, msg1);MessageString msg2 MessageBuilder.withPayload(消息B).build();rocketMQTemplate.send(tagFilterBoot:TagB, msg2);MessageString msg3 MessageBuilder.withPayload(消息C).build();rocketMQTemplate.send(tagFilterBoot:TagC, msg3); }消费者设置过滤条件 Component RocketMQMessageListener(consumerGroup tagFilterConsumerBoot, topic tagFilterBoot, selectorExpression TagA || TagC) // selectorExpression 过滤条件 public class TagFilterTopicListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {System.out.println(消息内容 new String(messageExt.getBody(), Charset.defaultCharset()));} }控制台打印 4.6.2 SQL92 过滤 使用 setHeader(String headerName, Object headerValue) 方法设置过滤条件 Test public void sendSQL92FilterMsg() {MessageString msg1 MessageBuilder.withPayload(美女A).setHeader(age, 22).setHeader(weight, 90).build();rocketMQTemplate.send(SQL92FilterBoot, msg1);MessageString msg2 MessageBuilder.withPayload(美女B).setHeader(age, 20).setHeader(weight, 100).build();rocketMQTemplate.send(SQL92FilterBoot, msg2);MessageString msg3 MessageBuilder.withPayload(美女C).setHeader(age, 25).setHeader(weight, 120).build();rocketMQTemplate.send(SQL92FilterBoot, msg3); }消费者设置 selectorType 与 selectorExpression Component RocketMQMessageListener(consumerGroup SQL92FilterConsumerBoot, topic SQL92FilterBoot,selectorType SelectorType.SQL92, selectorExpression age25 and weight90) // 设置 public class Sql92FilterTopicListener implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {System.out.println(消息内容 new String(messageExt.getBody(), Charset.defaultCharset()));} }控制台打印 注在使用 SQL 过滤的时候, 需要修改配置文件 broker.conf具体参考 3.6.2 文章参考Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战商城双11秒杀高并发消息支付分布式事物Seata
http://www.w-s-a.com/news/213287/

相关文章:

  • wordpress get值网站建设 seo sem
  • 网站建设微信开发工厂代加工平台
  • 厦门 网站建设 公司哪家好asp.net 创建网站
  • 专业北京网站建设凡科网做网站怎么样
  • 金富通青岛建设工程有限公司网站浙江省住建厅四库一平台
  • 有搜索引擎作弊的网站企业建设H5响应式网站的5大好处6
  • 是做网站编辑还是做平面设计seo外包公司接单
  • 做性的网站有哪些苏州专业网站设计制作公司
  • 陵水网站建设友创科技十大优品店排名
  • 想换掉做网站的公司简要说明网站制作的基本步骤
  • 国企公司网站制作wordpress 浮动定位
  • 网站网页直播怎么做的企业网站建设推荐兴田德润
  • 网站建设熊猫建站厦门seo全网营销
  • 扁平网站设计seo是什么岗位的缩写
  • 工商企业网站群晖配置wordpress 80端口
  • 企业网站建设流程步骤镇江东翔网络科技有限公司
  • 网络工程师和做网站哪个难网络建站如何建成
  • 网站建设需要哪些项目游民星空是用什么做的网站
  • 旅游网站建设要如何做百度商城网站建设
  • destoon 网站搬家中国企业500强都有哪些企业
  • 商城网站前端更新商品天天做吗哈尔滨做网站优化
  • 新乡网站开发wordpress 产品分类侧边栏
  • 网站自己做自己的品牌好做互联网企业分类
  • 项目网站建设方案石家庄网站快速排名
  • 网站开发大作业报告做电商网站的参考书
  • Apache局域网网站制作wordpress外链自动保存
  • 网站备案号要怎么查询千锋教育培训机构地址
  • 门户网站建设要求几款免费流程图制作软件
  • 花生壳域名可以做网站域名吗wordpress内链工具
  • 猎头公司网站模板网站伪静态作用