成都 地铁 建设 网站,网站开发价位评估,wordpress做的好的网站,百度关键词快速排名文章目录 1. 简介与安装2. 基本概念3. SpringAMQP4. 交换机类型5. 消息转换器5.1 默认转换器5.2 配置JSON转换器 6 生产者的可靠性6.1 生产者超时重连机制6.2 生产者确认机制 6. MQ的可靠性6.1 数据持久化6.2 惰性队列 Lazy Queue 7. 消费者的可靠性7.1 消费者确认机制7.2 失败… 文章目录 1. 简介与安装2. 基本概念3. SpringAMQP4. 交换机类型5. 消息转换器5.1 默认转换器5.2 配置JSON转换器 6 生产者的可靠性6.1 生产者超时重连机制6.2 生产者确认机制 6. MQ的可靠性6.1 数据持久化6.2 惰性队列 Lazy Queue 7. 消费者的可靠性7.1 消费者确认机制7.2 失败重试机制7.3 失败处理策略7.4 业务幂等性方案7.4.1 唯一消息ID7.4.2 业务判断 7.5 兜底策略 8. 延迟消息8.1 死信交换机8.2 DelayExchange插件 1. 简介与安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件支持AMQPXMPPSMTPSTOMP协议消息延迟时微秒级别的。 Ubuntu系统RabbitMQ的安装
2. 基本概念
Publisher 生产者发送消息的一方Consumer 消费者接收消息的一方Queue 队列存储消息Exchange 交换机负责消息路由生产者发送的消息由交换机负责投递到相应的队列。不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失VirtualHost 虚拟主机起到数据隔离的作用有各自的交换机和队列
3. SpringAMQP 导入Maven依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependencySpringAMQP提供了RabbitTemplate工具用于发送消息 yml配置 spring:rabbitmq:host: 127.0.0.1 # MQ部署的机器IPport: 5672 # 端口virtual-host: /test # 虚拟主机username: admin # 用户名password: admin # 密码RabbitMQ管理系统配置 创建虚拟主机/test创建交换机test.direct创建队列test.queue将队列test.queue绑定到交换机test.direct 发送消息测试 class LearnApplicationTests {AutowiredRabbitTemplate rabbitTemplate;Testvoid testSend() {String exchange test.direct;String msg hello RabbitMQ;rabbitTemplate.convertAndSend(exchange, , msg);}
}接收消息测试 import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** author zyq*/
Component
public class SpringRabbitListener {RabbitListener(queues test.queue)public void listenSimpleQueueMessage(String msg) {System.out.println(消费者接收到消息【 msg 】);}
}一个队列上存在多个监听器时假定队列test.queue上有两个消费者listener1和listener2默认情况下队列上的消息会由两个消费者平均分配第一个消息发给listener1第二个消息发给listener2第三个消息发给listener1, …如果两个消费者的性能存在差异那么性能好的消费者的资源无法充分利用可以通过配置prefetch 1切换到“能者多劳”策略。 spring:rabbitmq:host: 127.0.0.1 # MQ部署的机器IPport: 5672 # 端口virtual-host: /test_host # 虚拟主机username: admin # 用户名password: admin # 密码listener:simple:prefetch: 1 # 能者多劳不配置的话是将消息平均发送给消费者4. 交换机类型
Fanout交换机 广播交换机将消息发送到所有绑定的队列Direct交换机 根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routingkey完全一致才会接收到消息。Topic交换机 可以让队列在绑定BindingKey 的时候使用通配符 # 匹配一个或多个词* 匹配一个词
5. 消息转换器
5.1 默认转换器
在数据传输时发送的消息被序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。 只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题
数据体积过大有安全漏洞可读性差
5.2 配置JSON转换器
引入Maven依赖dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId
/dependency配置BeanBean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}6 生产者的可靠性
一般情况下只要生产者与MQ之间的网路连接顺畅基本不会出现发送消息丢失的情况。少数情况下可能出现投递的消息没有成功入队。
6.1 生产者超时重连机制
在生产者服务中进行如下配置
spring:rabbitmq:connection-timeout: 1s # 连接超时时间template:retry:enabled: true # 开启超时重连机制initial-interval: 1000ms # 初始等待时间multiplier: 1 # 等待时长倍数下次等待时长 initial-interval * multipliermax-attempts: 3 # 重试次数当网络不稳定时超时重连机制可以提高消息的发送成功率但是SpringAMQP提供的重连机制时阻塞式的。不建议开启该功能若业务需要需要配置合理的等待时间和重试次数也可以使用异步线程来执行发送消息的代码。
6.2 生产者确认机制
配置文件配置选项
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制并设置confirm类型# none关闭confirm机制; simple同步阻塞等待MQ的回执; correlatedMQ异步回调返回回执推荐publisher-returns: true # 开启publisher return机制Publisher Return 消息成功到达交换机但是路由失败时会触发ReturnCallback往往时编程导致的可以避免 Configuration
public class MqConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, code, text, exchange, key) - {// 实现return callbackSystem.err.println(【Return Call】 message: message , replyText: text);});}
}Publisher Confirm 消息投递到交换机但是路由失败触发ReturnCallback返回ACK临时消息不需要持久化投递到交换机并入队成功返回ACK持久化消息投递到交换机入队成功并完成持久化返回ACK其他情况返回NACK标识投递失败 Test
void contextLoads() {// new CorrelationData(UUID.randomUUID().toString());CorrelationData cd new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable throwable) {// Future本身发生错误一般不需要处理}Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// Future处理成功if (confirm.isAck()) {// 消息发送成功 ACK} else {// 消息发送失败 NACK// 执行消息发送失败的业务逻辑}}});String exchange ;String routingKey ;String msg ;rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd);
}总结 生产者确认机制比较耗费资源一般不开启不开启确认每秒钟可以投递数万的消息而开启后只能投递数千。若业务需要高可靠性只需要开启Publisher Confirm处理NACK的情况即可。
6. MQ的可靠性
消息到达MQ以后如果MQ不能及时保存也会导致消息丢失。 - MQ宕机 - 内存空间不足引发MQ阻塞执行持久化
6.1 数据持久化
交换机持久化默认开启队列持久化默认开启消息持久化Delivery-mode需要指定为2也就是持久化 - 若不开启消息持久化在内存不足时会发生MQ阻塞写磁盘PageOut - 若开启消息持久化会同步将消息写到磁盘MQ不会出现阻塞的现象速度稍微慢一点点。
6.2 惰性队列 Lazy Queue
从3.6.0开始支持从3.12开始默认使用该策略接收到消息后直接写入磁盘内存默认只保留2048条消息消费时才加载到内存支持百万消息存储
7. 消费者的可靠性
当RabbitMQ向消费者投递消息以后需要知道消费者的处理状态如何。因为消费者消费消息可能出现故障比如
消息投递的过程中出现了网络故障消费者接收到消息后突然宕机消费者接收到消息后因处理不当导致异常
7.1 消费者确认机制
RabbitMQ提供了消费者确认机制Consumer Acknowledgement当消费者处理消息后向RabbitMQ发送一个回执告知RabbitMQ自己消息处理状态。回执有三种可选值
ack成功处理消息RabbitMQ从队列中删除该消息nack消息处理失败RabbitMQ需要再次投递消息reject消息处理失败并拒绝该消息RabbitMQ从队列中删除该消息
消息确认机制的实现方式
none不处理。即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全不建议使用manual手动模式。需要自己在业务代码中调用api发送ack或reject存在业务入侵但更灵活auto自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强当业务正常执行时则自动返回ack. 当业务出现异常时根据异常判断返回不同结果 如果是业务异常会自动返回nack如果是消息处理或校验异常自动返回reject比如发生MessageConversionException
7.2 失败重试机制
开启消费者确认机制后如果消息处理一直返回NACK那么消息会反复进行入队和处理会导致MQ压力飙升。 而开启失败重试机制后消息会在本地重试而不是重新入队本地重试达到最大次数后默认会返回reject丢弃消息。 在消费者服务的配置文件中进行配置
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false7.3 失败处理策略
本地重试达到最大次数后默认会返回reject丢弃消息而有些业务显然无法接受消息的丢失。MQ支持之定义重试次数耗尽后的处理策略
RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息默认方式ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机。后续可进行人工处理 需要定义如下配置类
Configuration
public class MqErrorConfig {private final static String ERROR_EXCHANGE error.direct;private final static String ERROR_QUEUE error.queue;private final static String ERROR_ROTING_KEY error;/*** 创建处理失败消息的交换机* return*/Beanpublic DirectExchange errorExchange() {return new DirectExchange(ERROR_EXCHANGE);}/*** 创建存放失败消息的队列* return*/Beanpublic Queue errorQueue() {return new Queue(ERROR_QUEUE);}/*** 交换机与队列绑定* param errorQueue* param errorExchange* return*/Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with(ERROR_ROTING_KEY);}/*** 注册处理失败消息处理策略* param rabbitTemplate* return*/Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROTING_KEY);}
}7.4 业务幂等性方案
在程序开发中则是指同一个业务执行一次或多次对业务状态的影响是一致的。
7.4.1 唯一消息ID
每一条消息都生成一个唯一的id与消息一起投递给消费者。消费者接收到消息后处理自己的业务业务处理成功后将消息ID保存到数据库如果下次又收到相同消息去数据库查询判断是否存在存在则为重复消息放弃处理。 进行如下配置SpringAMQP会在消息头部自动添加唯一ID
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}7.4.2 业务判断
非幂等性业务会对数据进行更改那么我们在执行业务逻辑前可先判断数据记录是否处于未处理状态比如可以根据订单的状态。
7.5 兜底策略
开启定时任务主动去查询数据库判断数据有需要处理的数据。
8. 延迟消息
8.1 死信交换机
设计两个队列两个交换机当消息过期时消息会被投递到死信队列只需监听死信队列即可。通过设置队列dead-letter-exchange指定过期的消息投递的交换机也就是死信交换机。对于消息通过expration指定过期时间。 然而RabbitMQ的消息过期是基于追溯方式来实现的也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机而是在消息恰好处于队首时才会被处理。 当队列中消息堆积很多的时候过期消息可能不会被按时处理因此你设置的TTL时间不一定准确。
8.2 DelayExchange插件
开启队列的delayed配置并且在投递消息时设置delay时长。 延迟消息插件内部会维护一个本地数据库表同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长可能会导致堆积的延迟消息非常多会带来较大的CPU开销同时延迟消息的时间会存在误差。 因此不建议设置延迟时间过长的延迟消息。 改进策略将消息的delay时长分段比如将延迟时间切割成10s 10s 10s 15s 15s …大部分消息在前30s内就已经可以被消费不需要等到30分钟可以有效防止消息堆积。
参考资料https://www.bilibili.com/video/BV1mN4y1Z7t9/