做电商必备的八个软件,qq群排名优化软件官网,深圳公司宣传片制作,镇江网络违法网站1.消费端消息可靠性保证#xff1a;
消息确认#xff08;Acknowledgements#xff09;#xff1a;(自动(默认),手动)
消费者在接收到消息后#xff0c;默认情况下RabbitMQ会自动确认消息#xff08;autoAcktrue#xff09;。为保证消息可靠性#xff0c;可以设置auto…1.消费端消息可靠性保证
消息确认Acknowledgements(自动(默认),手动)
消费者在接收到消息后默认情况下RabbitMQ会自动确认消息autoAcktrue。为保证消息可靠性可以设置autoAckfalse使得消费者在处理完消息后手动发送确认basicAck。如果消费者在处理过程中发生异常或者未完成处理就终止运行那么消息在超时时间内将不会被删除会再次被RabbitMQ投递给其他消费者。缺点代码冗余多容易出现死循环。
死信队列Dead Letter Queue
当消息不能被正常消费时比如达到最大重试次数可以通过设置TTLTime To Live或者死信交换器Dead Letter Exchange将消息路由至死信队列从而有机会后续分析和处理这些无法正常消费的消息。人工干预
2.生产端消息可靠性保证 消息持久化
当生产者发布消息时,可以选择将其标记为持久化persistent).这意味着即使 RabbitMQ 服务器重启消息也不会丢失因为它们会被存储在磁盘上。(默认就是这) 2.确认Confirm机制(发布者确认Publisher Confirms)
开启confirm回调模式后RabbitMQ会在消息成功写入到磁盘并至少被一个交换器接受后向生产者发送一个确认acknowledgement。若消息丢失或无法投递给任何队列RabbitMQ将会发送一个否定确认nack). 生产者可以根据这些确认信号判断消息是否成功送达并采取相应的重试策略。 RabbitMQ作为消息中间件并启用publisher confirms发布者确认与 publisher returns发布者退回机制时可以确保消息从生产者到交换机的投递过程得到更准确的状态反馈。 发布者确认-Publisher Confirms 作用 Publisher Confirm机制允许RabbitMQ服务器通知生产者一个消息是否已经被交换机正确接收。当publisher-confirm-type设置为CORRELATED时RabbitMQ会向生产者发送确认或否定响应确认消息已到达交换机但不保证消息已被路由到至少一个队列中。 2.1.配置
spring.rabbitmq.publisher-confirm-type CORRELATEDsetConfirmCallback: (写到交换机,b为true.没写到交换机b为false)
发布者退回-Publisher Returns 作用 Publisher Return机制用于当消息无法按照路由键规则路由到任何队列时或者由于其他原因例如队列满、消息过大等而被交换机拒绝时RabbitMQ将消息返回给生产者。 交换机到队列的确认消息是否正常发送到了其中任何一个队列 通过实现 ReturnCallback 接口发送消息失败返回比如交换机路由不到队列时触发回调 1.只有消息没有路由到队列的时候才触发该回调 . 2.只要有一个队列接受到消息了它就认为成功. 配置
spring.rabbitmq.publisher-returns true
returnedMessage: (这是找到交换机了,但是路由错了,没有找到对列;这在直连交换机能测出来,广播交换机没路由,只要有绑定的队列就能发送成功)
完整代码
Service
Slf4j
public class ConfirmProvider {Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OrderingOk msg) {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {//没找到交换机就会falseif (b){String id correlationData.getId();log.info(消息发送成功);}else {log.info(消息发送失败);}}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info(消息发送失败);log.info(消息主体: {}, message);log.info(应答码: {}, i);log.info(描述{}, s);log.info(消息使用的交换器 exchange : {}, s1);log.info(消息使用的路由键 routing : {}, s2);}});CorrelationData correlationData new CorrelationData(980520);rabbitTemplate.convertAndSend(d_ex01, erew, msg, correlationData);}
}代码模版: public void send(OrderingOk msg){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 确认消息是否被交换机接收。** param correlationData 包含消息相关数据的对象用于识别消息的唯一性。* param ack 表示消息是否被交换机确认接收。* param cause 如果消息未被接收提供未接收的原因。*/Override //线程Bpublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){//该订单状态 10 - 20String id correlationData.getId();// 通过这个id改订单状态} else {log.error({],cause);}}});// 设置退回回调, 之后投递失败的时候才会触发rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 记录被交换机退回的消息信息。** param message 消息对象包含消息体。* param replyCode 返回的响应代码用于指示退回的原因。* param replyText 返回的响应文本提供关于退回的详细信息。* param exchange 退回时涉及的交换机名称。* param routingKey 退回时使用的路由键。*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(Message was returned: new String(message.getBody()));System.out.println(Reply code: replyCode);System.out.println(Reply text: replyText);System.out.println(Exchange: exchange);System.out.println(Routing key: routingKey);}});//CorrelationData 创建一个关联数据用于消息的跟踪大部分都是业务单据的idCorrelationData correlationData new CorrelationData(201408145676676);rabbitTemplate.convertAndSend(ordering_ok,,msg,correlationData);}