17做网站广州沙河地址,烟台商城网站建设,wordpress 搬家到新浪,最近发生的重大新闻文章目录 1.RabbitMQ消息丢失的三种情况2.RabbitMQ消息丢失解决方案2.1 针对生产者2.1.1 方案1 #xff1a;开启RabbitMQ事务2.1.2 方案2#xff1a;使用confirm机制 2.2 Exchange路由到队列失败2.3 RabbitMq自身问题导致的消息丢失问题解决方案2.3.1 消息持久化2.3.2 设置集… 文章目录 1.RabbitMQ消息丢失的三种情况2.RabbitMQ消息丢失解决方案2.1 针对生产者2.1.1 方案1 开启RabbitMQ事务2.1.2 方案2使用confirm机制 2.2 Exchange路由到队列失败2.3 RabbitMq自身问题导致的消息丢失问题解决方案2.3.1 消息持久化2.3.2 设置集群镜像模式2.3.3 消息补偿机制 2.3 针对消费者 3.总结 在使用消息队列时面对复杂的网络状况我们必须要考虑如何确保消息能够正常消费。在分析如何保证消息不丢失的问题之前我们需要对症下药什么样的情况会导致消息丢失。 1.RabbitMQ消息丢失的三种情况
在弄清消息丢失的情况之前我们先看看一条消息从产生到最终消费会经历哪些过程。
上面的图是官网中关于一条消息发送的整个流程消息会经历下面几个流程
生产者将消息发送到ExchangeExchange根据Routing Key路由到Queue消费者订阅Queue从Queue中获取数据消费
通过上面的RabbitMQ发送消息的模型我们可以知道在下面几个过程中消息可能会丢失
第一种生产者弄丢了数据。生产者将消息发送到Exchange时丢失。例如在发送过程中因为网络原因发送失败亦或者是因为发送到了一个不存在的Exchange。
第二种路由失败。这种情况就是消息已经发送到Exchange了但是Exchange将消息根据Routing Key路由到对应的Queue时失败例如这个Exchange根本就没有绑定Queue等等。
第三种客户端在处理消息时失败。客户端已经获取了消息但是在处理消息过程中出现异常没有对异常做处理导致消息丢失了。
上面这几种情况都是消息在向不同的模块传递时失败导致消息丢失了如果上面的情况都能解决也并不能保证消息不会丢失如果RabbitMQ服务宕机了如果这些消息没有被持久化等RabbitMQ服务重启之后这些没有持久化的消息也将丢失。
分析了这么多的情况可能会导致消息丢失下面将根据各种情况对应的分析来解决。
2.RabbitMQ消息丢失解决方案 2.1 针对生产者 生产者发送消息到Exchange失败 对于网络原因导致消息发送到Exchange失败这个我们很好感知我们只需要对发送异常做处理即可。排除这个原因默认情况下生产者将消息发送到Exchange是不会返回任何信息给生产者至于消息是不是真的到了服务端作为生产者根本无从可知。
对于这个问题RabbitMQ中有两种方式可以用来解决问题
通过事务机制实现通过发送方确认机制实现
2.1.1 方案1 开启RabbitMQ事务
可以选择用 RabbitMQ 提供的事务功能就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect然后发送消息如果消息没有成功被 RabbitMQ 接收到那么生产者会收到异常报错此时就可以回滚事务channel.txRollback然后重试发送消息如果收到了消息那么可以提交事务channel.txCommit。
值得我们注意的是:RabbitMQ中的事务与数据库的事务有稍许不同数据库每次都需要打开事务且最后与之对应的有commit或者rollback而RabbitMQ中channel中的事务只需要开启一次可以多次commit或者rollback。
开启事务的样例如下
// 开启事务
channel.txSelect();
try { // 这里发送消息
} catch (Exception e) { channel.txRollback();
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit(); 这样看可能不太直观下面我简单写一段使用RabbitMQ的代码然后给大家解释一下
//channel开启事务
channel.txSelect();
//发送3条消息
String msgTemplate 测试事务消息内容[%d];
channel.basicPublish(tx.exchange, tx, new AMQP.BasicProperties(), String.format(msgTemplate,1).getBytes(StandardCharsets.UTF_8));
channel.basicPublish(tx.exchange, tx, new AMQP.BasicProperties(), String.format(msgTemplate,2).getBytes(StandardCharsets.UTF_8));
channel.basicPublish(tx.exchange, tx, new AMQP.BasicProperties(), String.format(msgTemplate,3).getBytes(StandardCharsets.UTF_8));
//消息回滚
channel.txRollback();
//成功提交
channel.basicPublish(tx.exchange, tx, new AMQP.BasicProperties(), String.format(msgTemplate,4).getBytes(StandardCharsets.UTF_8));
channel.txCommit();上面的方法中一共发送了4次消息前三次发送后最后调用了txRollback这将导致前三条消息回滚而没有发送成功。而第四次发送之后调用commit最后在RabbitMQ中只会有一条消息。
虽然事务可以保证消息一定被提交到服务器而且在客户端编码方面足够简单。但是它也不是那么完美在性能方面事务会带来较大的性能影响。RabbitMQ 事务机制是同步的你提交一个事务之后会阻塞在那儿采用这种方式基本上吞吐量会下来因为太耗性能。
2.1.2 方案2使用confirm机制
事务机制和 confirm 机制最大的不同在于事务机制是同步的你提交一个事务之后会阻塞在那儿但是 confirm 机制是异步的。
confirm机制是为了解决事务性能问题的一种方案我们可以通过使用channel.confirmSelect方法开启confirm模式在生产者开启了confirm模式之后每次写的消息都会分配一个唯一的id然后如果写入了rabbitmq之中rabbitmq会给你回传一个ack消息告诉你这个消息发送OK了
如果rabbitmq没能处理这个消息会回调你一个nack接口告诉你这个消息失败了你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id如果超过一定时间还没接收到这个消息的回调那么你可以进行重发。
代码样例
生产者
public static void main(String[] args) throws Exception{ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟主机connectionFactory.setVirtualHost(/);//创建一个链接Connection connection connectionFactory.newConnection();//创建channelChannel channel connection.createChannel();//消息的确认模式channel.confirmSelect();String exchangeNametest_confirm_exchange;String routeKeyconfirm.test;String msgRabbitMQ send message confirm test!;for (int i0;i5;i){channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());}//确定监听事件channel.addConfirmListener(new ConfirmListener() {/*** 消息成功发送* param deliveryTag 消息唯一标签* param multiple 是否批量* throws IOException*/Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(**********Ack*********);}/*** 消息没有成功发送* param deliveryTag* param multiple* throws IOException*/Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println(**********No Ack*********);}});}
消费者
public static void main(String[] args) throws Exception{System.out.println(消息接收start);ConnectionFactory connectionFactorynew ConnectionFactory();connectionFactory.setHost(127.0.0.1);connectionFactory.setPort(5672);connectionFactory.setUsername(guest);connectionFactory.setPassword(guest);//设置虚拟主机connectionFactory.setVirtualHost(/);//创建链接Connection connection connectionFactory.newConnection();//创建channelChannel channel connection.createChannel();String exchangeNametest_confirm_exchange;String exchangeTypetopic;//声明Exchangechannel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);String queueNametest_confirm_queue;//声明队列channel.queueDeclare(queueName,true,false,false,null);String routeKeyconfirm.#;//绑定队列和交换机channel.queueBind(queueName,exchangeName,routeKey);channel.basicConsume(queueName, true, new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(接收到消息new String(body));}});}
需要注意的是confirm机制与事务是不能共存的简单的说就是开启事务就无法使用confirm开启confirm就无法使用事务。
2.2 Exchange路由到队列失败
在生产者将消息推送到RabbitMQ时我们可以通过事务或者confirm模式来保证消息不会丢失。但是这两种措施只能保证消息到达Exchange如果我们的消息无法根据RoutingKey到达对应的Queue中那么我们的消息最后就会丢失。
对于这种情况RabbitMQ中在发送消息时提供了mandatory参数。如果mandatory为true时Exchange根据自身的类型和RoutingKey无法找到对应的Queue它将不会丢掉该消息而是会将消息返回给生产者。
代码样例
//创建Exchange
channel.exchangeDeclare(mandatory.exchange, BuiltinExchangeType.DIRECT, true, false, new HashMap());
//创建Queue
channel.queueDeclare(mandatory.queue, true, false, false, new HashMap());
//绑定路由
channel.queueBind(mandatory.queue, mandatory.exchange, mandatory);
channel.addReturnListener(new ReturnListener() {Overridepublic void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {log.error(replyCode {},replyText {},exchange{},routingKey{},body{},replyCode,replyText,exchange,routingKey,new String(body));}
});
//设置mandatory true
//void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.basicPublish(mandatory.exchange, mandatory-1,true, new AMQP.BasicProperties(), 测试mandatory的消息.getBytes(StandardCharsets.UTF_8));在我们调用BasicPublish方法的时候我们设置了mandatory为true同时还给channel设置了ReturnListener用来监听路由到队列失败的消息。
2.3 RabbitMq自身问题导致的消息丢失问题解决方案
RabbitMQ本身主要应对三点 要保证rabbitMQ不丢失消息那么就需要开启rabbitMQ的持久化机制即把消息持久化到硬盘上这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息 如果rabbitMQ单点故障怎么办这种情况倒不会造成消息丢失这里就要提到rabbitMQ的3种安装模式单机模式、普通集群模式、镜像集群模式这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式 如果硬盘坏掉怎么保证消息不丢失。
2.3.1 消息持久化
RabbitMQ 的消息默认存放在内存上面如果不特别声明设置消息不会持久化保存到硬盘上面的如果节点重启或者意外crash掉消息就会丢失所以就要对消息进行持久化处理。
在RabbitMQ中我们可以通过将durable的值设置为true来保证持久化。如何持久化下面具体说明下。要想做到消息持久化必须满足以下三个条件缺一不可。 Exchange 设置持久化 Queue 设置持久化 Message持久化发送发送消息设置发送模式deliveryMode2代表持久化消息
2.3.2 设置集群镜像模式
先来介绍下RabbitMQ三种部署模式 单节点模式最简单的情况非集群模式节点挂了消息就不能用了。业务可能瘫痪只能等待。 普通模式消息只会存在与当前节点中并不会同步到其他节点当前节点宕机有影响的业务会瘫痪只能等待节点恢复重启可用必须持久化消息情况下。 镜像模式消息会同步到其他节点上可以设置同步的节点个数但吞吐量会下降。属于RabbitMQ的HA方案
为什么设置镜像模式集群因为队列的内容仅仅存在某一个节点上面不会存在所有节点上面所有节点仅仅存放消息结构和元数据。
如果想解决上面途中问题保证消息不丢失需要采用HA 镜像模式队列。
下面介绍下三种HA策略模式 同步至所有的 同步最多N个机器 只同步至符合指定名称的nodes
但是HA 镜像队列有一个很大的缺点就是系统的吞吐量会有所下降。
2.3.3 消息补偿机制
系统是在一个复杂的环境虽然以上的三种方案基本可以保证消息的高可用不丢失的问题但是仍然会遇到消息丢失的问题如持久化的消息保存到硬盘过程中当前队列节点挂了存储节点硬盘又坏了这种情况下消息仍然会丢失。
为了避免上面这个问题我们可以让生产端首先将业务数据以及消息数据入库需要在同一个事务中消息数据入库失败则整体回滚。
然后我们根据消息表中消息状态失败则进行消息补偿措施重新发送消息处理。
2.3 针对消费者
消费者获取消息后处理消息失败
通过上面的方式我们保证了从生产者到RabbitMQ消息不会丢失现在到了消费者消费消息了。
在消费者处理业务时可能由于我们业务代码的异常导致消息没有被正常处理完但是消息已经从RabbitMQ中的队列移除了这样我们的消息就丢失了。
我同样也可以通过ACK确认机制去避免这种情况
在生产者发送消息到RabbitMQ时我们可以通过ack来确认消息是否到达了服务端与之类似的是消费者在消费消息时同样提供手动ack模式。默认情况下消费者从队列中获取消息后会自动ack我们可以通过手动ack来保证消费者主动的控制ack行为这样我们可以避免业务异常导致消息丢失的情况。
DeliverCallback deliverCallback new DeliverCallback() {Overridepublic void handle(String consumerTag, Delivery message) throws IOException {try {byte[] body message.getBody();String messageContent new String(body, StandardCharsets.UTF_8);if(error.equals(messageContent)){throw new RuntimeException(业务异常);}log.info(收到的消息内容{},messageContent);channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch (Exception e){log.info(消费消息失败!重回队列!);channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);}}
};
CancelCallback cancelCallback new CancelCallback() {Overridepublic void handle(String consumerTag) throws IOException {log.info(取消订阅{},consumerTag);}
};
channel.basicConsume(confirm.queue,false,deliverCallback,cancelCallback);3.总结
我们通过分析消息从生产者发送消息到消费者消费消息的全过程得出了消息可能丢失的几种场景并给出了相应的解决方案如果需要保证消息在整条链路中不丢失那就需要生产端、mq自身与消费端共同去保障。
生产端对生产的消息进行状态标记开启confirm机制依据mq的响应来更新消息状态使用定时任务重新投递超时的消息多次投递失败进行报警。
mq自身开启持久化并在落盘后再进行ack。如果是镜像部署模式需要在同步到多个副本之后再进行ack。
消费端开启手动ack模式在业务处理完成后再进行ack并且需要保证幂等。
整个过程如下图所示
通过以上的处理理论上不存在消息丢失的情况但是系统的吞吐量以及性能有所下降。在实际开发中需要考虑消息丢失的影响程度来做出对可靠性以及性能之间的权衡。