安陆网站的建设,上海市建设工程 安全协会网站,虎丘网站建设,Wordpress主题里的幻灯片怎么设置原文首发于公众号【CSJerry】
在现代分布式系统中#xff0c;消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信#xff0c;并确保数据的可靠传输和处理。而在这个领域中#xff0c;RabbitMQ作为一种强大而受欢迎的消息队列解决方案#xff0c;具备了高…原文首发于公众号【CSJerry】
在现代分布式系统中消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信并确保数据的可靠传输和处理。而在这个领域中RabbitMQ作为一种强大而受欢迎的消息队列解决方案具备了高效、可靠和灵活的特性。
然而即使使用了RabbitMQ我们仍然会遇到一些不可预料的情况例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题RabbitMQ引入了死信队列Dead Letter Queue的概念为开发人员提供了一种有效的错误处理机制。
那么究竟什么是死信队列呢
本文结合Spring Boot使用RabbitMQ的死信队列着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。
1. 是什么
死信队列Dead Letter Queue是一种特殊的消息队列用于存储无法被消费的消息。当消息满足某些条件无法被正常消费时将被发送到死信队列中进行处理。死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。
2. 为什么
用来处理消费者无法正确处理的消息避免消息丢失或积压。实现延迟消息处理例如订单超时未支付可以将该消息发送到死信队列然后再进行后续处理。用于实现消息重试机制当消费者处理失败时将消息重新发送到死信队列进行重试。提高了系统的可伸缩性和容错性能够应对高并发和异常情况。
3. 怎么用
在Spring Boot中配置和使用死信队列 首先在pom.xml文件中添加RabbitMQ的依赖项。然后在application.properties文件中配置RabbitMQ连接信息。接下来创建生产者和消费者代码并通过注解将队列和交换机进行绑定。在队列的声明中添加死信队列的相关参数如x-dead-letter-exchange和x-dead-letter-routing-key等。最后在消费者中编写处理消息的逻辑包括对异常消息进行处理并设置是否重新发送到死信队列。 简而言之死信队列可以认为是一个正常队列的备用队列或者说是兜底队列当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机由死信交换机根据路由键将消息投递到备用队列启动服务备用方案。 消息从正常队列到死信队列的三种情况 1、消息被否定确认使用 channel.basicNack 或 channel.basicReject 并且此时requeue 属性被设置为false。 2、消息在队列中的时间超过了设置的TTL()时间。 3、消息数量超过了队列的容量限制()。 当一个队列中的消息满足上述三种情况任一个时改消息就会从原队列移至死信队列若改队列没有绑定死信队列则消息被丢弃。 4. 实战
以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码
配置
spring.rabbitmq.host127.0.0.1
spring.rabbitmq.port5672
spring.rabbitmq.usernamerabbit
spring.rabbitmq.password123456
# 开启消费者手动确认
spring.rabbitmq.listener.typedirect# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.publisher-returnstrue# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-typesimple配置类
Configuration
Slf4j
public class RabbitCof {Resourceprivate MqKeys mqKeys;Bean(normalQueue)public Queue normalQueue() {/*** 为普通队列绑定交换机*/MapString, Object args new HashMap();args.put(x-dead-letter-exchange, mqKeys.DIE_EXCHANGE);args.put(x-dead-letter-routing-key, mqKeys.DIE_ROUTING_KEY);args.put(x-message-ttl, 1000); // 队列中的消息未被消费则1秒后过期return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);}Bean(normalExchange)public Exchange normalExchange() {return new DirectExchange(mqKeys.NORMAL_EXCHANGE);}Bean(normalBind)public Binding normalBinding(Qualifier(normalQueue) Queue normalQueue, Qualifier(normalExchange) Exchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();}/*** 死信队列* return*/Bean(dieQueue)public Queue dlQueue() {return new Queue(mqKeys.DIE_QUEUE, true, false, false);}/*** 死信交换机* return*/Bean(dieExchange)public Exchange dlExchange() {return new DirectExchange(mqKeys.DIE_EXCHANGE);}Bean(dieBind)public Binding dlBinding(Qualifier(dieQueue) Queue dlQueue, Qualifier(dieExchange) Exchange dlExchange) {return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();}Resourceprivate ConnectionFactory connectionFactory;Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);/*** 消费者确认收到消息后手动ack回调处理* spring.rabbitmq.publisher-confirm-typesimple*/rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)-{if(!ack) {log.info(消息投递到交换机失败correlationData{} ,ack{}, cause{}, correlationData null ? null : correlationData.getId(), ack, cause);} else {log.info(消息成功投递到交换机correlationData{} ,ack{}, cause{}, correlationData null ? null : correlationData.getId(), ack, cause);}});/*** 消息投递到队列失败回调处理* spring.rabbitmq.listener.direct.acknowledge-modemanual* spring.rabbitmq.publisher-returnstrue*/rabbitTemplate.setReturnsCallback((returnedMessage)-{Message message returnedMessage.getMessage();log.error(分发到到队列失败, body-{}, message.getBody());});return rabbitTemplate;}
}生产者类
Component
public class Producer {Resourceprivate MqKeys mqKeys;Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);}
}消费者类
Component
RabbitListener(queues normal.queue)
Slf4j
public class Consumer {RabbitHandlerpublic void handleMessage(String data, Message message, Channel channel) {boolean success false;int retryCount 3;System.out.println(message.toString());long deliveryTag message.getMessageProperties().getDeliveryTag();while (!success retryCount-- 0){try {// 处理消息log.info(收到消息: {}, deliveryTag {}, data, deliveryTag);// 正常处理完毕手动确认此处不确认让他进入死信队列
// success true;
// channel.basicAck(deliveryTag, false);Thread.sleep(3 * 1000L);}catch (Exception e){log.error(程序异常{}, e.getMessage());}}// 达到最大重试次数后仍然消费失败if(!success){try {log.info(move to die queue);// 手动拒绝移至死信队列/***deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Delivermultiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered*/channel.basicNack(deliveryTag, false, false);} catch (IOException e) {e.printStackTrace();}}}
}以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列然后通过生产者发送消息到普通队列在消费者中处理消息并模拟了当发生异常时将消息重新发送到死信队列。
参考连接 [rabbit 官网]Dead Letter Exchanges — RabbitMQ 具体代码仓库