广州网站建设外包,打字网站怎么做,温州网站制作价格,wordpress 引号转义目录
一、概念
二、出现死信的原因
三、实战
#xff08;一#xff09;代码架构图
#xff08;二#xff09;消息被拒
#xff08;三#xff09;消息TTL过期
#xff08;四#xff09;队列达到最大长度 一、概念 先从概念解释上搞清楚这个定义#xff0c;死信一代码架构图
二消息被拒
三消息TTL过期
四队列达到最大长度 一、概念
先从概念解释上搞清楚这个定义死信顾名思义就是无法被消费的消息字面意思可以这样理 解一般来说producer 将消息投递到 broker 或者直接到 queue 里了consumer 从 queue 取出消息进行消费但某些时候由于特定的原因导致 queue 中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失需要使用到 RabbitMQ 的死信队列机制当消息 消费发生异常时将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效 二、出现死信的原因
消息 TTL 过期 队列达到最大长度(队列满了无法再添加数据到 mq 中) 消息被拒绝(basic.reject 或 basic.nack)并且 requeuefalse.
三、实战
一代码架构图 二消息被拒
生产者
public class Producer {public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();for (int i 0; i 10; i) {String message info i;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, null, message.getBytes());}}
}
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息) public class Consumer01 {public static final String NORMAL_QUEUE normal_queue;public static final String NORMAL_EXCHANGE normal_exchange;public static final String DEAD_QUEUE dead_queue;public static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, lisi);MapString, Object arguments new HashMap();// 普通队列设置对应的交换机arguments.put(x-dead-letter-exchange, DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put(x-dead-letter-routing-key, lisi);// 设置队列最大长度arguments.put(x-max-length, 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, zhangsan);DeliverCallback deliverCallback (consumerTag, message) - {String msg new String(message.getBody());if (msg.equals(info5)) {System.out.println(Consumer01接收到消息 message 并拒绝签收该消息);channel.basicReject(message.getEnvelope().getDeliveryTag(), false);} else {System.out.println(consumer01接收到消息 msg);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag - {});}
} C2 消费者代码不变 启动消费者 1 然后再启动消费者 2 三消息TTL过期
生产者代码public class Producer {public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();AMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();for (int i 0; i 10; i) {String message info i;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, properties, message.getBytes());}}
} 消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息) public class Consumer01 {public static final String NORMAL_QUEUE normal_queue;public static final String NORMAL_EXCHANGE normal_exchange;public static final String DEAD_QUEUE dead_queue;public static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, lisi);MapString, Object arguments new HashMap();// 普通队列设置对应的交换机arguments.put(x-dead-letter-exchange, DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put(x-dead-letter-routing-key, lisi);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, zhangsan);DeliverCallback deliverCallback (consumerTag, message) - {String msg new String(message.getBody());System.out.println(consumer01接收到消息 msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag - {});}
} 消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息) public class Consumer02 {public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();System.out.println(等待接收死信队列消息.....);DeliverCallback deliverCallback (consumerTag, message) - {System.out.println(Consumer02 接收死信队列的消息 new String(message.getBody()));};channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag - {});}
} 四队列达到最大长度
我们在声明普通队列时添加一个参数x-max-length即可
生产者
public class Producer {public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();for (int i 0; i 10; i) {String message info i;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, null, message.getBytes());}}
}
C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)这里设置了队列最多容纳6条消息此时由于生产者发送了10条消息所以有4条会进入死信队列。public class Consumer01 {public static final String NORMAL_QUEUE normal_queue;public static final String NORMAL_EXCHANGE normal_exchange;public static final String DEAD_QUEUE dead_queue;public static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws IOException {Channel channel RabbitMqUtils.getChannel();// 声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);// 死信的绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, lisi);MapString, Object arguments new HashMap();// 普通队列设置对应的交换机arguments.put(x-dead-letter-exchange, DEAD_EXCHANGE);// 设置死信队列的RouteKeyarguments.put(x-dead-letter-routing-key, lisi);// 设置队列最大长度arguments.put(x-max-length, 6);// 声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);// 普通的绑定channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, zhangsan);DeliverCallback deliverCallback (consumerTag, message) - {String msg new String(message.getBody());System.out.println(consumer01接收到消息 msg);};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag - {});}
} 注意此时需要把原先队列删除 因为参数改变了 C2 消费者代码不变(启动 C2 消费者) 总结 死信队列可能出现的三种情况为 ①消息被拒消费者方拒绝签收该消息 ②消息设置的TTL过期生产者方设置的过期时间 ③消息投放的队列内消息已经满了放不进入时消息会进入死信队列