宝安高端网站建设公司,做音乐网站之前的准备,网上培训,加速器国外一、死信的概念
先从概念解释上搞清楚这个定义#xff0c;死信#xff0c;顾名思义就是无法被消费的消息#xff0c;字面意思可以这样理 解#xff0c;一般来说#xff0c;producer 将消息投递到 broker 或者直接到 queue 里了#xff0c;consumer 从 queue 取出消息进行…一、死信的概念
先从概念解释上搞清楚这个定义死信顾名思义就是无法被消费的消息字面意思可以这样理 解一般来说producer 将消息投递到 broker 或者直接到 queue 里了consumer 从 queue 取出消息进行消费但某些时候由于特定的原因导致 queue 中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失需要使用到 RabbitMQ 的死信队列机制当消息 消费发生异常时将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
二、死信的来源
1、消息 TTL 过期 2、队列达到最大长度(队列满了无法再添加数据到 mq 中) 3、消息被拒绝(basic.reject 或 basic.nack)并且 requeuefalse.
三、死信实战
代码架构图 生产者正常情况下走的是普通的交换机这个交换机的类型是 direct 它和普通队列之间的关系是一个叫 zhangsan 的路由 key 正常情况下会被 C1 消费。但是发生了上面所说的三种情况中的一种成为了死信然后被转换到死信交换机中这个死信交换机也是 direct 类型它们之间的 routingKey 是 lisi,然后就进入了死信队列死信队列由 C2 消费。
消息 TTL 过期
1.application.yml 配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: metieVirtualHosts
# listener:
# simple:
# retry:
# ### 开启消费者程序出现异常的情况下会进行重试
# enabled: true
# ### 最大重试次数
# max-attempts: 5
# ### 重试间隔次数
# initial-interval: 3000
# acknowledge-mode: manualserver:port: 8080###死信队列
mayikt:dlx:exchange: mayikt_dlx_exchangequeue: mayikt_order_dlx_queueroutingkey: dlx### 订单order:exchange: mayikt_order_exchangequeue: mayikt_order_queueroutingkey: mayikt.order
2MQConfig配置
package com.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** description:* author: huangan* date: 2024/11/23 20:48*/
Component
public class DeadLetterMQConfig {/*** 订单交换机*/Value(${mayikt.order.exchange})private String orderExchange;/*** 订单队列*/Value(${mayikt.order.queue})private String orderQueue;/*** 订单路由key*/Value(${mayikt.order.routingkey})private String orderRoutingkey;/*** 死信交换机*/Value(${mayikt.dlx.exchange})private String dlxExchange;/*** 死信队列*/Value(${mayikt.dlx.queue})private String dlxQueue;/*** 死信路由*/Value(${mayikt.dlx.routingkey})private String dlxRoutingkey;/*** 声明死信交换机* return*/Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlxExchange);}/*** 声明死信队列* return*/Beanpublic Queue dlxQueue(){return new Queue(dlxQueue);}/*** 声明订单业务交换机* return*/Beanpublic DirectExchange orderExchange(){return new DirectExchange(orderExchange);}/*** 声明订单队列* return*/Beanpublic Queue orderQueue(){//订单队列要绑定死信交换机MapString,Object arguments new HashMap(2);arguments.put(x-dead-letter-exchange,dlxExchange);arguments.put(x-dead-letter-routing-key,dlxRoutingkey);return new Queue(orderQueue,true,false,false,arguments);}/*** 绑定死信队列到死信交换机* return*/Beanpublic Binding binding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingkey);}/*** 绑定订单队列到订单交换机* return*/Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingkey);}
}3.生产者OrderProducer
package com.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** description:* author: huangan* date: 2024/11/23 21:58*/
RestController
public class OrderProducer {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 订单交换机*/Value(${mayikt.order.exchange})private String orderExchange;/*** 订单路由key*/Value(${mayikt.order.routingkey})private String orderRoutingkey;RequestMapping(/sendOrder)public String sendOrder(){String msg 测试数据;rabbitTemplate.convertAndSend(orderExchange,orderRoutingkey,msg,message - {//设置消息过期时间10秒过期message.getMessageProperties().setExpiration(10000);return message;});return success;}}4.订单消费者
package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** description: 订单队列* author: huangan* date: 2024/11/23 22:13*/
Component
Slf4j
public class OrderConsumer {/*** 监听队列回调方法* param msg*/RabbitListener(queues mayikt_order_queue)public void orderConsumer(String msg){log.info(正常订单消费者消费MSG:{},msg);}
}5.死信消费者
package com.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** description: 死信队列* author: huangan* date: 2024/11/23 22:17*/
Slf4j
Component
public class OrderDlxConsumer {/*** 监听队列回调方法* param msg*/RabbitListener(queues mayikt_order_dlx_queue)public void orderConsumer(String msg){log.info(死信队列消费订单消费者消费MSG:{},msg);}
}