怎么看网站后台网页尺寸,php动态页面,贵州省住房和建设厅网网站,和平苏州网站建设目录 确保消息的可靠性RabbitMQ 消息发送可靠性分析解决方案开启事务机制发送方确认机制单条消息处理消息批量处理 失败重试自带重试机制业务重试 RabbitMQ 消息消费可靠性如何保证消息在队列RabbitMQ 的消息消费#xff0c;整体上来说有两种不同的思路#xff1a;确保消费成… 目录 确保消息的可靠性RabbitMQ 消息发送可靠性分析解决方案开启事务机制发送方确认机制单条消息处理消息批量处理 失败重试自带重试机制业务重试 RabbitMQ 消息消费可靠性如何保证消息在队列RabbitMQ 的消息消费整体上来说有两种不同的思路确保消费成功两种思路消息确认自动确认手动确认推模式手动确认拉模式手动确认 消息拒绝总结如何保证消息的可靠性。 幂等性问题背景解决思路代码 确保消息的可靠性 先确定消息可能在哪些位置丢失—不同的位置可以有不同的解决方案
发送过程 从生产者到交换机从交换机到队列 消费过程 消息在队列中消费者消费
RabbitMQ 消息发送可靠性
分析 目标 消息成功到达 Exchange消息成功到达 Queue 如果能确认这两步那么我们就可以认为消息发送成功了。 如果这两步中任一步骤出现问题那么消息就没有成功送达此时我们可能要通过重试等方式去重新发送消息多次重试之后如果消息还是不能到达则可能就需要人工介入了。 经过上面的分析我们可以确认要确保消息成功发送我们只需要做好三件事就可以了 确认消息到达 Exchange。确认消息到达 Queue。开启定时任务定时投递那些发送失败的消息
解决方案 如何确保消息成功到达 RabbitMQRabbitMQ 给出了两种方案 开启事务机制发送方确认机制 这是两种不同的方案不可以同时开启只能选择其中之一如果两者同时开启则会报如下错误
开启事务机制 事务管理器 Configuration
public class RabbitConfig {Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}消息生产者添加事务注解并设置通信信道为事务模式 Service
public class MqService {Resourceprivate RabbitTemplate rabbitTemplate;Transactional //标记事务public void send() {rabbitTemplate.setChannelTransacted(true);//开启事务模式rabbitTemplate.convertAndSend(mq_exchange_name,mq_queue_name,hello rabbitmq!.getBytes());int i 1 / 0;//运行时必然抛出异常我们可以尝试运行该方法发现消息并未发送成功}
}当我们开启事务模式之后RabbitMQ 生产者发送消息会多出四个步骤
客户端发出请求将信道设置为事务模式。服务端给出回复同意将信道设置为事务模式。客户端发送消息。客户端提交事务。服务端给出响应确认事务提交。
上面的步骤除了第三步是本来就有的其他几个步骤都是平白无故多出来的。所以大家看到事务模式其实效率有点低这并非一个最佳解决方案。我们可以想想什么项目会用到消息中间件一般来说都是一些高并发的项目这个时候并发性能尤为重要。
所以RabbitMQ 还提供了发送方确认机制publisher confirm来确保消息发送成功这种方式性能要远远高于事务模式
发送方确认机制
单条消息处理 配置文件开启消息发送方确认机制 server:port: 8888
spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到达交换器的确认回调publisher-returns: true #配置消息到达队列的回调
# publisher-confirm-type有三个值
# none表示禁用发布确认模式默认即此。
# correlated表示成功发布消息到交换器后会触发的回调方法。
# simple类似 correlated并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。开启两个监听 /*** author: zjl* datetime: 2024/5/9* desc:* 定义配置类实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口* 这两个接口前者的回调用来确定消息到达交换器后者则会在消息路由到队列失败时被调用。* * 定义 initRabbitTemplate 方法并添加 PostConstruct 注解* 在该方法中为 rabbitTemplate 分别配置这两个 Callback。*/
Configuration
Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {public static final String MQ_EXCHANGE_NAME mq_exchange_name;public static final String MQ_QUEUE_NAME mq_queue_name;Resourceprivate RabbitTemplate rabbitTemplate;Beanpublic Queue queue() {return new Queue(MQ_QUEUE_NAME);}Beanpublic DirectExchange directExchange() {return new DirectExchange(MQ_EXCHANGE_NAME);}Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(MQ_QUEUE_NAME);}PostConstructpublic void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info({}:消息成功到达交换器,correlationData.getId());}else{log.error({}:消息发送失败, correlationData.getId());}}Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.error({}:消息未成功路由到队列,message.getMessageProperties().getMessageId());}
}测试 首先尝试将消息发送到一个不存在的交换机中 RestController
public class SendController {Resourceprivate RabbitTemplate rabbitTemplate;;RequestMapping(/send)public String send() {rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME, RabbitConfig.MQ_QUEUE_NAME,hello rabbitmq!.getBytes(),new CorrelationData(UUID.randomUUID().toString()));return send success;}
}给定一个真实存在的交换器但是给一个不存在的队列 RestController
public class SendController {Resourceprivate RabbitTemplate rabbitTemplate;;RequestMapping(/send)public String send() {//rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME, RabbitConfig.MQ_QUEUE_NAME,hello rabbitmq!.getBytes(),new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend(RabbitConfig.MQ_EXCHANGE_NAME,RabbitConfig.MQ_QUEUE_NAME,hello rabbitmq!.getBytes(),new CorrelationData(UUID.randomUUID().toString()));return send success;}
}可以看到消息虽然成功达到交换器了但是没有成功路由到队列因为队列不存在
消息批量处理
如果是消息批量处理那么发送成功的回调监听是一样的这里不再赘述。这就是 publisher-confirm 模式。相比于事务这种模式下的消息吞吐量会得到极大的提升
失败重试
失败重试分两种情况一种是压根没找到 MQ 导致的失败重试另一种是找到 MQ 了但是消息发送失败了
自带重试机制
前面所说的事务机制和发送方确认机制都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ那么 Spring Boot 中也有相应的重试机制但是这个重试机制就和 MQ 本身没有关系了这是利用 Spring 中的 retry 机制来完成的 配置 server:port: 8888
spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到达交换器的确认回调publisher-returns: true #配置消息到达队列的回调template: retry:enabled: true # 开启重试机制initial-interval: 1000ms # 重试起始间隔时间max-attempts: 10 # 最大重试次数max-interval: 10000ms # 最大重试间隔时间multiplier: 2 # 间隔时间乘数。这里配置间隔时间乘数为 2则第一次间隔时间 1 秒第二次重试间隔时间 2 秒第三次 4 秒以此类推再次启动 Spring Boot 项目然后关掉 MQ此时尝试发送消息就会发送失败进而导致自动重试
业务重试
业务重试主要是针对消息没有到达交换机的情况如果消息没有成功到达交换机此时就会触发消息发送失败回调我们可以利用起来这个回调下面说一下整体思路 准备数据库表 DROP TABLE IF EXISTS service_msg_mq_info;
CREATE TABLE service_msg_mq_info (msgid varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,empid int(11) NULL DEFAULT NULL,status int(11) NULL DEFAULT NULL,routekey varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,exchange varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,count int(11) NULL DEFAULT NULL,trytime datetime NULL DEFAULT NULL,createtime datetime NULL DEFAULT NULL,updatetime datetime NULL DEFAULT NULL,PRIMARY KEY (msgid) USING BTREE
) ENGINE InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ROW_FORMAT Dynamic;status表示消息的状态有三个取值012 分别表示消息发送中、消息发送成功以及消息发送失败。 tryTime表示消息的第一次重试时间消息发出去之后在 tryTime 这个时间点还未显示发送成功此时就可以开始重试了。 count表示消息重试次数。 每次发送消息的时候就往数据库中添加一条记录 在消息发送的时候我们就往该表中保存一条消息发送记录并设置状态 status 为 0tryTime 为 1 分钟之后 在消息发送的时候我们就往该表中保存一条消息发送记录并设置状态 status 为 0tryTime 为 1 分钟之后 另外开启一个定时任务定时任务每隔 10s 就去数据库中捞一次消息专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录把这些消息拎出来后首先判断其重试次数是否已超过 3 次如果超过 3 次则修改该条消息的 status 为 2表示这条消息发送失败并且不再重试。对于重试次数没有超过 3 次的记录则重新去发送消息并且为其 count 的值1
当然这种思路有两个弊端
去数据库走一遭可能拖慢 MQ 的 Qos不过有的时候我们并不需要 MQ 有很高的 Qos所以这个应用时要看具体情况。按照上面的思路可能会出现同一条消息重复发送的情况不过这都不是事我们在消息消费时解决好幂等性问题就行了。
当然大家也要注意消息是否要确保 100% 发送成功也要看具体情况。
RabbitMQ 消息消费可靠性
如何保证消息在队列
队列持久化—》创建的时候设置持久化搭建rabbitmq集群–保证高可用
RabbitMQ 的消息消费整体上来说有两种不同的思路 推pushMQ 主动将消息推送给消费者这种方式需要消费者设置一个缓冲区去缓存消息对于消费者而言内存中总是有一堆需要处理的消息所以这种方式的效率比较高这也是目前大多数应用采用的消费方式。这种方式通过 RabbitListener 注解去标记消费者如以下代码当监听的队列中有消息时就会触发该方法 Component
public class ConsumerDemo {RabbitListener(queues RabbitConfig.MQ_QUEUE_NAME)public void handle(String msg) {System.out.println(msg msg);}
}拉pull消费者主动从 MQ 拉取消息这种方式效率并不是很高不过有的时候如果服务端需要批量拉取消息倒是可以采用这种方式 Test
public void test01() throws UnsupportedEncodingException {Object o rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);System.out.println(o new String(((byte[]) o),UTF-8));
}调用 receiveAndConvert 方法方法参数为队列名称方法执行完成后会从 MQ 上拉取一条消息下来如果该方法返回值为 null表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法可以在重载方法中传入一个等待超时时间例如 3 秒。此时假设队列中没有消息了则 receiveAndConvert 方法会阻塞 3 秒3 秒内如果队列中有了新消息就返回3 秒后如果队列中还是没有新消息就返回 null这个等待超时时间要是不设置的话默认为 0 这是消息两种不同的消费模式 如果需要从消息队列中持续获得消息就可以使用推模式 如果只是单纯的消费一条消息则使用拉模式即可。 切忌将拉模式放到一个死循环中变相的订阅消息这会严重影响 RabbitMQ 的性能
确保消费成功两种思路
为了保证消息能够可靠的到达消息消费者RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候可以通过指定 autoAck 参数来表示消息消费的确认方式 当 autoAck 为 false 的时候此时即使消费者已经收到消息了RabbitMQ 也不会立马将消息移除而是等待消费者显式的回复确认信号后才会将消息打上删除标记然后再删除。当 autoAck 为 true 的时候此时消息消费者就会自动把发送出去的消息设置为确认然后将消息移除从内存或者磁盘中即使这些消息并没有到达消费者。
- 属性解释
Ready 表示待消费的消息数量。Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
当我们将 autoAck 设置为 false 的时候对于 RabbitMQ 而言消费分成了两个部分 待消费的消息已经投递给消费者但是还没有被消费者确认的消息 换句话说当设置 autoAck 为 false 的时候消费者就变得非常从容了它将有足够的时间去处理这条消息当消息正常处理完成后再手动 ack此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈并且此时客户端也已经断开连接了那么 RabbitMQ 就会将刚刚的消息重新放回队列中等待下一次被消费。
综上所述确保消息被成功消费无非就是手动 Ack 或者自动 Ack无他。当然无论这两种中的哪一种最终都有可能导致消息被重复消费所以一般来说我们还需要在处理消息时解决幂等性问题。
消息确认
自动确认
在 Spring Boot 中默认情况下消息消费就是自动确认的通过 Componet 注解将当前类注入到 Spring 容器中然后通过 RabbitListener 注解来标记一个消息消费方法默认情况下消息消费方法自带事务即如果该方法在执行过程中抛出异常那么被消费的消息会重新回到队列中等待下一次被消费如果该方法正常执行完没有抛出异常则这条消息就算是被消费了Component
public class ConsumerDemo {RabbitListener(queues RabbitConfig.MQ_QUEUE_NAME)public void receive1(String msg) {System.out.println(msg msg);int i 1 / 0;}
}手动确认 配置修改为手动确认模式 server:port: 8888
spring:rabbitmq:host: 192.168.29.200port: 5672username: adminpassword: adminvirtual-host: /publisher-confirm-type: correlated # 配置消息到达交换器的确认回调publisher-returns: true #配置消息到达队列的回调template:retry:enabled: trueinitial-interval: 1000msmax-attempts: 10max-interval: 10000msmultiplier: 2listener:simple:acknowledge-mode: manual推模式手动确认
将消费者要做的事情放到一个 try…catch 代码块中。如果消息正常消费成功则执行 basicAck 完成确认。如果消息消费失败则执行 basicNack 方法告诉 RabbitMQ 消息消费失败。RabbitListener(queues RabbitConfig.MQ_QUEUE_NAME)
public void receive1(Message message,Channel channel) {long deliveryTag message.getMessageProperties().getDeliveryTag();try {//消息消费的代码写到这里String s new String(message.getBody());System.out.println(s s);//消费完成后手动 ackchannel.basicAck(deliveryTag, false);} catch (Exception e) {//手动 nacktry {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}这里涉及到两个方法 basicAck这个是手动确认消息已经成功消费该方法有两个参数 第一个参数表示消息的 id第二个参数 multiple 如果为 false表示仅确认当前消息消费成功如果为 true则表示当前消息之前所有未被当前消费者确认的消息都消费成功。 basicNack这个是告诉 RabbitMQ 当前消息未被成功消费该方法有三个参数 第一个参数表示消息的 id第二个参数 multiple 如果为 false表示仅拒绝当前消息的消费如果为 true则表示拒绝当前消息之前所有未被当前消费者确认的消息第三个参数 requeue 含义和前面所说的一样被拒绝的消息是否重新入队。当 basicNack 中最后一个参数设置为 false 的时候还涉及到一个死信队列的问题
拉模式手动确认
拉模式手动 ack 比较麻烦一些在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法所以我们得用原生的办法这里涉及到的 basicAck 和 basicNack 方法跟前面的一样
public void receive2() {Channel channel rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);long deliveryTag 0L;try {GetResponse getResponse channel.basicGet(RabbitConfig.MQ_QUEUE_NAME, false);deliveryTag getResponse.getEnvelope().getDeliveryTag();System.out.println(o new String((getResponse.getBody()), UTF-8));channel.basicAck(deliveryTag, false);} catch (IOException e) {try {channel.basicNack(deliveryTag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}消息拒绝
当客户端收到消息时可以选择消费这条消息也可以选择拒绝这条消息
Component
public class ConsumerDemo {RabbitListener(queues RabbitConfig.JAVABOY_QUEUE_NAME)public void receive2(Channel channel, Message message) {//获取消息编号long deliveryTag message.getMessageProperties().getDeliveryTag();try {//拒绝消息channel.basicReject(deliveryTag, true);} catch (IOException e) {e.printStackTrace();}}
}消费者收到消息之后可以选择拒绝消费该条消息拒绝的步骤分两步 获取消息编号 deliveryTag。调用 basicReject 方法拒绝消息。 调用 basicReject 方法时第二个参数是 requeue即是否重新入队。 如果第二个参数为 true则这条被拒绝的消息会重新进入到消息队列中等待下一次被消费 如果第二个参数为 false则这条被拒绝的消息就会被丢掉不会有新的消费者去消费它了。 需要注意的是basicReject 方法一次只能拒绝一条消息
总结如何保证消息的可靠性。
设置confirm和returning机制设置队列和交互机的持久化搭建rabbitMQ服务集群消费者改为手动确认机制。
幂等性问题
背景
消费者在消费完一条消息后向 RabbitMQ 发送一个 ack 确认此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack那么此时 RabbitMQ 并不会将该条消息删除当重新建立起连接后消费者还是会再次收到该条消息这就造成了消息的重复消费。同时由于类似的原因消息在发送的时候同一条消息也可能会发送两次
解决思路
采用 Redis在消费者消费消息之前现将消息的 id 放到 Redis 中存储方式如下 id-0正在执行业务id-1执行业务成功 如果 ack 失败在 RabbitMQ 将消息交给其他的消费者时先执行 setnx如果 key 已经存在说明之前有人消费过该消息获取他的值如果是 0当前消费者就什么都不做如果是 1直接 ack。极端情况第一个消费者在执行业务时出现了死锁在 setnx 的基础上再给 key 设置一个生存时间。生产者发送消息时指定 messageId
代码 添加redis依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId
/dependency]添加redis配置 redis:host: localhostport: 6379password: 123456timeout: 3000msdatabase: 0配置类 Configuration
Slf4j
public class RabbitConfig{public final static String DIRECTNAME mq-direct;Beanpublic Queue queue() {return new Queue(hello-queue);}Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECTNAME, true, false);}Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with(direct);}
}生产者 RestController
public class SendController {Resourceprivate RabbitTemplate rabbitTemplate;;RequestMapping(/send)public String send() {//携带信息发送CorrelationData messageId new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitConfig.DIRECTNAME,direct,message,messageId);return send success;}
}消费者 package cn.smbms.consumer;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** author: zjl* datetime: 2024/5/9* desc: */
Component
public class DirectReceiver {Resourceprivate StringRedisTemplate stringRedisTemplate;RabbitListener(queues hello-queue)public void getMassage(String msg, Channel channel, Message message) throws IOException {//1、获取messageIDString messageID message.getMessageProperties().getHeader(spring_returned_message_correlation);//2、用redis的setnx()方法放入值 放入成功返回true 放入失败返回falseif (stringRedisTemplate.opsForValue().setIfAbsent(messageID, 0, 10, TimeUnit.SECONDS)) {//3、消费消息System.out.println(接收到消息 msg);//4、设置value值为1stringRedisTemplate.opsForValue().set(messageID, 1,10,TimeUnit.SECONDS);//5、手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {//6、如果放入值失败 获取messageID对应的valueString s stringRedisTemplate.opsForValue().get(messageID);//7、value0 什么都不做if (0.equalsIgnoreCase(s)) {return;//8、value1 手动ack} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}}
}