上海商城网站开发,北京网站推广外包,商品网站建设实验报告,注册平台文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递
在生产环境中#xff0c;如果因为一些不明原因导致RabbitMQ重启#xff0c;RabbitMQ重启过程中是无法接收消息的#xff0c;那么我们就需要生产者重新发送消息。或者在消息…
文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递
在生产环境中如果因为一些不明原因导致RabbitMQ重启RabbitMQ重启过程中是无法接收消息的那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外消息没有正常投放我们需要消息回到交换机重新投放。
为了解决因为这些种种意外而产生的问题就需要使用消息的可靠性投递。我们需要在三个过程保证消息传输的正常
消息从生产者到交换机的过程消息从交换机到队列的过程消息从队列到消费者的过程
消息从生产者到交换机的过程中我们可以使用confirm机制来保障消息的可靠性。
消息从交换机到队列的过程中我们可以使用return机制来保障消息的可靠性。
消息从队列到消费者的过程中我们可以使用ack这个参数来保障消息的可靠性。
其中confirm机制和return机制都是使用回调函数当消息投放失败后在回调函数中将消息放入redis用定时任务重新投放。需要我们自己编码。
ack参数是RabbitMQ实现的我们需要手动ack。RabbitMQ将一条消息推送给消费者如果没有接收到ack就会重发。
1. confirm机制
使用confirm机制需要在配置文件中将rabbitmq.publisher-confirm-type设置为correlated。表示使用confirm机制。
publisher-confirm-type: correlatedpublisher-confirm-type有以下三种值
simple 简单的执行ack的判断在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。同步confirm性能较差correlated 打开消息确认机制 执行ack的时候还会携带消息的元数据。none 禁用发布确认模式默认值。
最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 ConfirmCallback此接口只有一个方法
void confirm(Nullable CorrelationData correlationData, boolean ack, Nullable String cause);correlationData 保存消息id以及相关信息 ack 交换机是否收到消息收到为true cause 消息接收不到原因成功接收消息则为null
在这个函数里可以判断ack如果为false则代表信息发送失败可以存入redis让定时任务扫描redis重新发送消息。解决方案有很多这里简单提个建议
Slf4j
Configuration
public class RabbitConfig {Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {// 打印日志log.info(ConfirmCallback 相关数据 correlationData);log.info(ConfirmCallback 确认情况 ack);log.info(ConfirmCallback 原因 cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}
}总结
confirm机制有两步 在yml配置文件中将publisher-confirm-type设置为correlated开启confirm机制 publisher-confirm-type: correlated设置回调函数setConfirmCallback rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {// 打印日志log.info(ConfirmCallback 相关数据 correlationData);log.info(ConfirmCallback 确认情况 ack);log.info(ConfirmCallback 原因 cause);// 存入数据库中定时重发.// ...
});2. return机制
在了解return 机制前先了解一个参数 mandatory。
mandatory是AMQP协议中basic.publish方法中的标识位。
如果交换机根据路由键找不到对应的队列那么此时它有两个选择 将消息返回交换机、将消息丢弃当mandatory的值为true时消息将返回给交换机当mandatory的值为false时消息将被丢弃。
我们想要实现消息的可靠性投递当消息投放错时让消息重新投放那么我们就需要将mandatory的值设置为true。
所以实现return机制有三步
想使用return机制首先要在配置文件中将publisher-returns的值设置为true代表开启return机制
publisher-returns: true接下来将mandatory这个值设置为true代表如果消息丢失或者出现意外将消息返回而不是丢弃。
rabbitTemplate.setMandatory(true);最后实现ReturnCallback这个接口
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);message 此条消息replyCode 错误编码replyText 消息接收失败的原因exchange 此条消息的目标交换机routingKey 此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 打印日志log.info(ReturnCallback 消息 message);log.info(ReturnCallback 回应码 replyCode);log.info(ReturnCallback 回应信息 replyText);log.info(ReturnCallback 交换机 exchange);log.info(ReturnCallback 路由键 routingKey);// 做其他处理...
});执行到returnCallback函数就代表消息从交换机到队列的过程出现问题例如路由键错误、网络问题、找不到相应队列…这些情况下可以先打印日志再使用手动签收机制让交换机重新发送消息。
因为路由键错误这种就是代码写错了重发消息也没用只能打印日志等操作来尽可能提醒开发人员但是网络问题可以重新发送消息。
3. 总结
实现消息的可靠性投递通过confirm机制和return机制。 在配置文件中开启这两种机制 publisher-confirm-type: correlated
publisher-returns: true注入RabbitTemplate时将mandatory设置为true代表消息投递异常时将消息返回而不是丢弃。 注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。 Slf4j
Configuration
public class RabbitConfig {Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - {// 打印日志log.info(ConfirmCallback 相关数据 correlationData);log.info(ConfirmCallback 确认情况 ack);log.info(ConfirmCallback 原因 cause);// 存入redis数据库中定时重发.// ...});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 打印日志log.info(ReturnCallback 消息 message);log.info(ReturnCallback 回应码 replyCode);log.info(ReturnCallback 回应信息 replyText);log.info(ReturnCallback 交换机 exchange);log.info(ReturnCallback 路由键 routingKey);});return rabbitTemplate;}
}在保证confirm机制和return机制的同时别忘记让消费者手动ack。 注意 为什么return机制和ack机制可以重新发送而confirm机制只能借用外部手段呢例如redis、xxl-job 因为return机制触发时消息已经到了交换机可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。只是在发送给队列时出现错误人家RabbitMQ自己设计的可以重发。 但是confirm机制触发时消息刚从生产者发送给交换机还没进入RabbitMQ只能借用外部手段了。