网站域名购买,外贸网站建设厦门,建站宝盒成品网站演示,电子书店网站开发文章目录 消息队列#xff08;Message Queue#xff09;什么场景下#xff0c;使用消息队列#xff1f;消息队列 概述 RabbitMQ 消息队列RabbitMQ 概念名词 概念RabbitMQ 流程 RabbitMQ 安装RabbitMQ 页面介绍Exchange 交换机类型Spring Boot 整合RabbitMQAmqpAdmin 与 Rab… 文章目录 消息队列Message Queue什么场景下使用消息队列消息队列 概述 RabbitMQ 消息队列RabbitMQ 概念名词 概念RabbitMQ 流程 RabbitMQ 安装RabbitMQ 页面介绍Exchange 交换机类型Spring Boot 整合RabbitMQAmqpAdmin 与 RabbitTemplate 的使用整合的 序列化问题Spring Boot 整合的 RabbitListener 监听RabbitListener 注解RabbitHandler 注解 PostConstruct 注解 和 Primary 注解 使用RabbitMQ 发送端生产者 消息确认机制事务消息了解即可RabbitMQ消息确认的 三个阶段第一个阶段可靠抵达 - confirmCallback第二个阶段可靠抵达 - ReturnCallback 可靠抵达 消费端消费者 Ack消息确认机制第三个阶段ack 消息确认机制 电商 订单中心订单中心 的 重要经验订单登录 拦截Feign 远程调用丢失请求头问题丢失请求头 等同于 登录失效了Feign 异步情况丢失上下文问题 接口幂等性 处理防重复提交幂等性 概念幂等性 考虑情况幂等解决方案token机制各种锁机制数据库悲观锁数据库乐观锁业务层分布式锁 各种唯一约束数据库唯一约束redis set 防重 防重表全局请求唯一id 分布式事务本地事务分布式事务的 问题 以及 理论CAP定理BASE 理论 分布式事务的 多种方案2PC模式柔性事务 - TCC 事务补偿型 方案常用柔性事务 - 最大努力通知型方案常用柔性事务 - 可靠消息 最终一致性方案常用 Seata 框架Seata 介绍Seata 环境搭建Seata 的几个模式 订单服务采用 最终一致性方案 解决分布式事务问题RabbitMQ 延时队列实现定时任务延迟队列的 设计和实现锁和解锁库存 架构实现订单服务 库存解锁 场景订单服务 定时关单 场景 RabbitMQ 消息积压、丢失、重复解决方案如何保证消息可靠性 - 消息丢失消息丢失场景 一消息发送出去由于网络原因没有抵达服务器。消息丢失场景 二消息抵达BrokerBroker要将消息写入磁盘持久化才算成功。此时Broker尚未持久化完成宕机。消息丢失场景 三自动ACK的状态下。消费者收到消息但没来得及处理消息服务器宕机了。 如何保证消息可靠性 - 消息重复如何保证消息可靠性 - 消息积压 消息队列Message Queue
什么场景下使用消息队列
需要异步处理的内容。
可能某个操作要调用多个服务流程去完成这几者之间也没有强依赖性无需等待返回。 应用解耦
没有使用消息队列订单系统 调用 库存系统接口库存系统维护升级那么订单系统调用的接口也要维护升级 所以很麻烦使用消息队列订单系统 无需依靠着库存系统的接口只需要给消息队列发送消息库存系统去消息队列消费消息即可实现应用上的解耦。
流量控制(也叫做 流量削峰)
业务中涉及到某一时间段内有大量请求需要处理可以通过消息队列来做到流量控制。例如电商系统里面的 秒杀业务。
消息队列 概述
消息服务中两个重要概念消息代理message broker和 目的地destination当消息发送者发送消息以后将由消息代理接管消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地
队列queue点对点消息通信point - to - point
过程消息发送给消息代理消息代理将其放入一个队列接收者从里面获取消息消毒读取后移除队列。消息只有唯一的发送者 一个或多个接收者。
主题topic发布publish/订阅subscribe消息通信。
过程消息发送到主题多个接收者订阅者监听订阅这个主题那么就会在消息到达同时收到消息。
消息队列的 两种协议遵循了协议就能够调用 消息队列中间件
Spring 集成支持Spring Boot 继承自动配置
市面上的MQ产品ActiveMQ、RabbitMQ、RocketMQ、Kafka本次项目采用AMQP的Rabbitmq来做消息队列。
RabbitMQ 消息队列
RabbitMQ 概念
名词 概念
Message消息由消息头和消息体组成。消息体是不透明的消息头则由一系列的可选属性组成这些属性包括routing-key路由键、priority相对于其他消息的优先权、delivery-mode指出该消息可能需要持久性存储等。Publisher消息的生产者也是一个向交换器发布消息的客户端应用程序。Exchange交换器用来接受生产者发送的消息并将这些消息路由给服务器中的队列。四种交换器类型direct默认、fanout、topic、headers不同类型转发消息的策略也不同。Queue消息队列。Binding绑定用于消息队列和交换机之间的关联。Connection网络连接比如TCP连接。Channel信道多路复用连接中的独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接AMQP命令都是通过信道发出去的不管是发布消息、订阅队列还是接受消息这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销所以引入了信道的概念以服用一条TCP连接。Consumer消费者Virtual Host虚拟主机表示一批交换器、消息队列和相关对象。每个虚拟主机本质上就是一个mini版的RabbitMQ服务器拥有自己的队列、交换器、绑定和权限机制。目的起到隔离效果比如一个RabbitMQ中有一台针对Java调用的虚拟主机有一台针对python调用的虚拟主机这样Java虚拟主机如果出现问题也不会影响python这台也有按照开发、测试、生产环境来的。Broker表示消息队列服务器实体。
RabbitMQ 流程
RabbitMQ流程图
RabbitMQ 安装
执行docker命令。
# 1. 启动 rabbitmq:management 容器
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 \
-p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 2. 自动重启
docker update rabbitmq --restartalwaysrabbitmq:management 集成了web管理后台的端口。端口 解释
启动成功后访问 IP:15672 查看页面即可。
账号密码默认都是guest
RabbitMQ 页面介绍
Overview 概述Admin 管理Exchanges 交换机Queues 队列
Exchange 交换机类型
Direct Exchange直接模式直接 交换机。
将消息交给指定队列路由routing key按照绑定Binding关系将消息发到对应的队列中这个是完全匹配完全按照路由绑定关系去找对应的消息队列。这种叫做完全匹配、单播模式。例如routing key 为 dog 的消息不会转发 dog.puppy … 其他的。
Fanout Exchange广播模式 扇形 交换机
每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。
Topic Exchange主题模式 主题 交换机
topic 交换机通过模式匹配分配消息的路由键属性将路由键和某个模式进行匹配。就相当于会区分路由键不同的路由会走向不同的队列。会涉及到通配符#、* Spring Boot 整合RabbitMQ
引入amqp启动类RabbitAutoConfiguration就会自动生效。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependencyRabbitAutoConfiguration类自动配置了RabbitTemplate、AmqpAdmin 等实例对象。EnableRabbit 标识启动注解。
package com.atguigu.gulimall.order;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// 启动 注解
EnableRabbit
SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}}配置rabbitmq属性。
# rabbitmq 配置信息
spring.rabbitmq.hostwww.gulimall.com
spring.rabbitmq.port5672
spring.rabbitmq.virtual-host/AmqpAdmin 与 RabbitTemplate 的使用
package com.atguigu.gulimall.order;import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;SpringBootTest
class GulimallOrderApplicationTests {AutowiredAmqpAdmin amqpAdmin;AutowiredRabbitTemplate rabbitTemplate;// 发送messageTestpublic void sendMessage(){OrderReturnReasonEntity orderReturnReasonEntity new OrderReturnReasonEntity();orderReturnReasonEntity.setId(1l);orderReturnReasonEntity.setCreateTime(new Date());orderReturnReasonEntity.setName(测试实体类);// 1. 发送消息// 如果发送的消息是个对象我们会使用序列化机制将对象写出去。对象必须实现SerializablerabbitTemplate.convertAndSend(hello-java-exchange,hello.java,orderReturnReasonEntity);}// 创建交换机Testvoid createExchange() {// 参数名字、是否持久化、是否自动删除DirectExchange directExchange new DirectExchange(hello-java-exchange,true,false);amqpAdmin.declareExchange(directExchange);}// 创建队列Testvoid createQueue() {Queue queue new Queue(hello-java-queue,true,false,false);amqpAdmin.declareQueue(queue);}// 创建绑定Testvoid createBinding() {// 将exchange指定的交换机和destination目的地进行绑定使用routingKey作为指定的路由键Binding binding new Binding(hello-java-queue, // 目的地Binding.DestinationType.QUEUE, // 目的地类型hello-java-exchange, // 指定交换机hello.java, // 路由keynull // 参数);amqpAdmin.declareBinding(binding);}}整合的 序列化问题
对于实体类序列化默认为jdk序列化可以通过配置消息转换来实现Jackson2JSON序列化
package com.atguigu.gulimall.order.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}Spring Boot 整合的 RabbitListener 监听
RabbitListener 注解
RabbitListener注解参数
queues声明需要监听的所有队列。RabbitListener注解标注在 类 或者 方法上。看源码注解能看出RabbitHandler注解标注在 方法上。
被RabbitListener注解 标注方法上
第一个参数 Message message原生消息相信信息消息头 消息体第二个参数 T发送的消息的类型 T content 消息体里面对应的实体信息一般为实体类第三个参数 Channel channel 当前传输数据的通道。注意要引入rabbitmq的Channel。
package com.atguigu.gulimall.order.service.impl;import com.alibaba.fastjson.JSON;
import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;Service(orderItemService)
public class OrderItemServiceImpl extends ServiceImplOrderItemDao, OrderItemEntity implements OrderItemService {Overridepublic PageUtils queryPage(MapString, Object params) {IPageOrderItemEntity page this.page(new QueryOrderItemEntity().getPage(params),new QueryWrapperOrderItemEntity());return new PageUtils(page);}/* RabbitListener参数* queues声明需要监听的所有队列。** 被标注方法参数* 第一个参数 Message message原生消息相信信息消息头 消息体* 第二个参数 T发送的消息的类型 T content 消息体里面对应的实体信息一般为实体类* 第三个参数 Channel channel 当前传输数据的通道。(有问题没有Channel类)*/RabbitListener(queues {hello-java-queue})public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {// body里面存储的是发送的消息内容byte[] body message.getBody();// fixme 这种方式太麻烦可以用参数直接来接受消息体里面的内容。// OrderReturnReasonEntity orderReturnReasonEntity JSON.parseObject(body.toString(), OrderReturnReasonEntity.class);System.out.println(body: content);// properties存储的是发过来的消息头属性MessageProperties properties message.getMessageProperties();System.out.println(接收到消息...内容 message.toString() ,类型 message.getClass());}}TipQueue可以很多人都来监听。只要收到消息队列删除消息而且只能有一个收到此消息场景 订单服务启动多个同一个消息只能有一个客户端收到。单个服务一次只能处理一个消息等消息完全处理完方法运行结束才可以接受到下一个消息。 RabbitHandler 注解
被RabbitListener注解 标注类上参数
重载方法区分不同的消息的作用。针对 接受的消息类型 会有多种的情况可以使用RabbitHandler来标识不同方法来处理不同情况。 碰壁了此处有问题没有Channel类可能跟配置序列化有关系。 package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;RabbitListener(queues {hello-java-queue})
Service(orderItemService)
public class OrderItemServiceImpl extends ServiceImplOrderItemDao, OrderItemEntity implements OrderItemService {Overridepublic PageUtils queryPage(MapString, Object params) {IPageOrderItemEntity page this.page(new QueryOrderItemEntity().getPage(params),new QueryWrapperOrderItemEntity());return new PageUtils(page);}/* RabbitListener参数* queues声明需要监听的所有队列。** 被标注方法参数* 第一个参数 Message message原生消息相信信息消息头 消息体* 第二个参数 T发送的消息的类型 T content 消息体里面对应的实体信息一般为实体类* 第三个参数 Channel channel 当前传输数据的通道。(有问题没有Channel类)*/RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println(接收到消息... content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消息处理完成... content);}RabbitHandlerpublic void receiveMessageForOrderEntity(Channel channel, Message message,OrderEntity content) {System.out.println(接收到消息... content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消息处理完成... content);}}PostConstruct 注解 和 Primary 注解 使用
PostConstruct注解相当于创建完当前这个对象后之后调用的方法。可以翻译为构造器之后。例如下面就是等 MyRabbitConfig 创建实例后执行initRabbitTemplate方法给RestTemplate对象配置相关内容。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {Autowird // 此处报错闭环错误。RabbitTemplate rabbitTemplate;// 使用JSON序列化机制进行消息转换Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/* PostConstruct注解相当于创建完MyRabbitConfig对象之后调用的方法。翻译为构造器之后。*/PostConstructpublic void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(confirm...);System.out.println(CorrelationData: correlationData);System.out.println(ack: ack);System.out.println(cause: cause);}});}}Primary注解在Spring框架中表示当有多个相同类型的bean时使用该注解赋予bean更高的优先级。比如在Spring IOC容器中有多个相同类型的bean时当要注入该类型的bean就可以使用Primary来标注注入bean的优先优先级高的bean先被注入。
RabbitMQ 发送端生产者 消息确认机制
事务消息了解即可
事务消息
将所有的过程都锁定到一个事务中一起成功一起失败。
为了保证消息不丢失可靠抵达可以使用事务消息但是性能却下降250倍所以事务消息是不推荐使用的为此引入确认机制。
RabbitMQ消息确认的 三个阶段
先看官方文档Reliability Guide — RabbitMQ
一个完整效果图
p生产者c消费者。
pprovider - bBroker需要 confirmCallbackeExchange - qQueue需要 returnCallbackqQueue - cConsumer需要 ack
第一个阶段可靠抵达 - confirmCallback
实现 可靠抵达 步骤
可靠抵达 配置注意不同版本的SpringBoot可能会不同。
## 开启发送端消息抵达Broker确认使用的SpringBoot版本不支持弃用了
# spring.rabbitmq.publisher-confirmstrue
## 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returnstrue
## 只要消息抵达Queue就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatorytrue
spring.rabbitmq.publisher-confirm-typecorrelated在Spring框架中spring.rabbitmq.publisher-confirms 属性是用于开启或关闭消息发布确认的。从Spring AMQP 2.0开始这个属性已经被弃用并推荐使用 spring.rabbitmq.confirm-interval 和 spring.rabbitmq.publisher-returns 这两个新的属性来代替。
rabbitTemplate的容器对象配置好对应的setConfirmCallback方法方便测试。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;PrimaryBeanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);this.rabbitTemplate rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化机制进行消息转换Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 第一个参数 correlationData当前消息的唯一关联数据这个是消息的唯一ID* 第二个参数 ack消息是否成功收到* 第三个参数 cause失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(confirm...correlationData correlationData ,ack: ack ,cause: cause);}});}}总结只要消息抵达Broker就acktrue。
第二个阶段可靠抵达 - ReturnCallback
SpringBoot 配置
## 开启发送端消息抵达Broker确认使用的SpringBoot版本不支持弃用了
# spring.rabbitmq.publisher-confirmstrue
## 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returnstrue
## 只要消息抵达Queue就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatorytrue
## 手动ack消息不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-modemanual添加消息的唯一ID方便测试。
rabbitTemplate.convertAndSend方法new CorrelationData(UUID.randomUUID().toString())的作用使用UUID来创建该消息的唯一ID。
Autowired
RabbitTemplate rabbitTemplate;// new CorrelationData(UUID.randomUUID().toString()) 添加消息唯一ID方便测试
rabbitTemplate.convertAndSend(hello-java-exchange,hello.java,orderEntity,new CorrelationData(UUID.randomUUID().toString()));配置 rabbitTemplate.setReturnsCallback方法。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;PrimaryBeanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);this.rabbitTemplate rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化机制进行消息转换Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 1、 只要消息抵达Broker就acktrue* 第一个参数 correlationData当前消息的唯一关联数据这个是消息的唯一ID* 第二个参数 ack消息是否成功收到* 第三个参数 cause失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(confirm...correlationData correlationData ,ack: ack ,cause: cause);}});// 设置消息抵达队列的确认回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/* 该方法只有失败才会调用成功不会被调用。* ReturnedMessage对象里面的参数解释* private final Message message; 投递失败的消息详情信息* private final int replyCode; 回复的状态码* private final String replyText; 回复的文本内容* private final String exchange; 当时这个消息发给哪个交换机* private final String routingKey; 当时这个消息用哪个路由键*/Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println(失败 消息Message: returnedMessage.getMessage() ,replyCode returnedMessage.getReplyCode() ,replyText returnedMessage.getReplyText() exchange returnedMessage.getExchange() ,routingKey returnedMessage.getRoutingKey());}});}}失败测试不成功待解决。 可靠抵达 消费端消费者 Ack消息确认机制 第三个阶段ack 消息确认机制 Tipack全称acknowledge 收到通知。 默认自动ack确认的只要消息接收到客户端会自动确认(队列)服务端就会自动移除这个消息。
存在问题当收到很多消息自动回复给服务器ack然而服务器宕机了。这就导致了消息的丢失。
将ack设置为手动确认之后进行测试。
手动确认模式只要我们没有明确的告诉mq货物被签收。没有Ack消息就一直是Unacked状态。即使出现了宕机消息也不会丢失会重新变为ready。
## 将ack确认设置为手动模式
spring.rabbitmq.listener.simple.acknowledge-modemanual通过Channel来手动确认消息。
channel.basicAck(deliveryTag, false) 肯定确认channel.basicNack(deliveryTag,false,true); 否定确认channel.basicReject(deliveryTag,true) 也是否定确认但是不能批量操作。
package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;RabbitListener(queues {hello-java-queue})
Service(orderItemService)
public class OrderItemServiceImpl extends ServiceImplOrderItemDao, OrderItemEntity implements OrderItemService {RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println(接收到消息... content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(消息处理完成... content);long deliveryTag message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag: deliveryTag);/* channel.basicAck 等同于 签收获取手动确认。* 第一个参数可以理解为一个货物标签就是消息的标签是哪个消息被确认了。* 第二个参数是否批量确认。*/// 签收货物非批量模式try {if (deliveryTag % 2 0) {// 肯定确认channel.basicAck(deliveryTag, false);} else {// 否定确认/* channel.basicNack 等同于 拒绝签收拒绝确认。* 第一个参数可以理解为一个货物标签就是消息的标签是哪个消息被确认了。* 第二个参数是否批量确认。* 第三个参数是否重新回归队列。*/channel.basicNack(deliveryTag,false,true);// channel.basicReject(deliveryTag,true);// 效果一样但是不能批量操作。 }} catch (IOException e) {// 网络中断e.printStackTrace();}}}总结 电商 订单中心
订单中心 的 重要经验
在电商系统中订单中心很重要涉及到3流分别是信息流、资金流、物流。订单中心就相当于是三者的中间整合商。订单的作用把感兴趣的商品整合一起生成一个支付单然后完成一个发货的物流过程。所以订单模块是电商系统的枢纽在订单这个环节商需求获取多个模块的数据和信息。同时对这多个信息进行加工处理流向下个环节。
订单所涉及到的信息如下
订单总流程
名词实物订单、虚拟订单话费、库存锁定下了单没支付需要锁定库存、库存解锁超时未支付解锁正常流程订单生成 -》 支付订单 -》 卖家发货 -》 确认收货 -》 交易成功。 订单生成 流程包括
创建订单 -》 验令牌幂等性 -》 验价格优惠、扣减等等 -》 锁库存只要有异常回滚订单数据
锁库存方式
通过SQL条件来控制即可。
update wms_ware_sku set stock_locked stock_locked #{num}
where sku_id #{skuId}
and ware_id #{wareId}
and stock - stock_locked #{num}订单登录 拦截
创建拦截器。
也是用到了ThreadLocal存储用户信息。
package com.atguigu.gulimall.order.interceptor;import com.atguigu.common.constant.AuthServerConstant;
import com.atguigu.common.vo.MemberRespVo;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/* author: xuyanbo* description: TODO* date: 2023/10/31 11:37*/
Component
public class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocalMemberRespVo loginUser new ThreadLocal();Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {MemberRespVo userInfo (MemberRespVo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);if (userInfo ! null) {loginUser.set(userInfo);return true;} else {// 没登录去登录request.getSession().setAttribute(msg,请先进行登录);response.sendRedirect(http://auth.gulimall.com/login.html);return false;}}}注册拦截器并且配置路径。
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.interceptor.LoginUserInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;/* author: xuyanbo* description: TODO* date: 2023/10/31 11:39*/
Configuration
public class OrderWebConfiguration implements WebMvcConfigurer {AutowiredLoginUserInterceptor interceptor;Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(interceptor).addPathPatterns(/);}}Feign 远程调用丢失请求头问题丢失请求头 等同于 登录失效了
问题原因
解决办法feign在发起远程调用之前会经过一大堆的拦截器我们也可以添加一个拦截器将相关信息维护上
添加一个Feign远程调用拦截器。在拦截器里面使用RequestContextHolder获取到请求以及请求头相关信息。 案例代码如下
ServletRequestAttributes attributes (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); 是重点。RequestContextHolder的原理是ThreadLocal
package com.atguigu.gulimall.order.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;/* author: xuyanbo* description: TODO* date: 2023/10/31 14:56*/
Configuration
public class GuliFeignConfig {// 添加一个拦截器并且声明名字Bean(requestInterceptor)public RequestInterceptor requestInterceptor(){return new RequestInterceptor() {Overridepublic void apply(RequestTemplate template) {// 1. 使用RequestContextHolder拿到刚进来的这个请求ServletRequestAttributes attributes (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();HttpServletRequest request attributes.getRequest();// 2. 同步请求头数据CookieString cookie request.getHeader(Cookie);// 3. 给Feign的请求里面同步当前请求的cookie信息template.header(Cookie,cookie);}};}}Feign 异步情况丢失上下文问题
因为RequestContextHolder 的原理是 ThreadLocal 当我们使用异步的方式进行Feign的远程调用相当于创建了多个子线程而不是主线程了这时RequestInterceptor拦截器里面的RequestContextHolder就无法获取到请求的相关信息了因为请求信息在主线程的RequestContextHolder中。解决办法
将主线程的RequestContextHolder请求属性提前拿出来赋值给多个子线程的RequestContextHolder请求属性就可以了。
Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {OrderConfirmVo confirmVo new OrderConfirmVo();MemberRespVo memberRespVo LoginUserInterceptor.loginUser.get();System.out.println(主线程... Thread.currentThread().getId());// 主线程RequestAttributes requestAttributes RequestContextHolder.getRequestAttributes();CompletableFutureVoid getAddressFuture CompletableFuture.runAsync(() - {System.out.println(member子线程... Thread.currentThread().getId());// 在member子线程设置请求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 1. 远程查询所有的收货地址列表ListMemberAddressVo address memberFeignService.getAddress(memberRespVo.getId());confirmVo.setAddress(address);}, executor);CompletableFutureVoid cartFuture CompletableFuture.runAsync(() - {System.out.println(cart子线程... Thread.currentThread().getId());// 在cart子线程设置请求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 2. 远程查询购物车所有选中的购物项ListOrderItemVo items cartFeignService.getCurrentUserCartItems();confirmVo.setItems(items);}, executor);CompletableFuture.allOf(getAddressFuture,cartFuture).get();// 3. 查询用户积分Integer integration memberRespVo.getIntegration();confirmVo.setIntegration(integration);// 4. 其他数据自动计算// TODO 5. 防重令牌return confirmVo;
}接口幂等性 处理防重复提交
幂等性 概念
幂等性概念幂等性是指对同一个资源的多个请求在业务逻辑上具有相同的结果。
幂等性 考虑情况
场景案例
订单业务用户点了多次订单发起多次订单请求出现了多个订单。支付场景用户点了多次支付发起了多次支付请求结果扣款了多次。
哪些情况需要防止
用户多次点击按钮。用户页面回退再次提交。微服务互相调用由于网络问题导致请求失败feign触发重试机制。其他业务情况。
什么情况需要幂等
有一些操作是天然幂等的。
例如select * from tableA from id ? update tab1 set col1 1 where col2 2 delete from user where userId 1 insert into user(userId,name) values (1,0) userId作为唯一主键只会插入一条用户数据也是具备幂等性的。无论执行多少次结果都一样不会改变状态这些就是天然幂等的具有幂等性的。
不具有幂等性的情况。
update tab1 set col1 col1 1 where col2 2 每次操作执行结果都会发生变化这就不是幂等性的。insert into user(userId,name) values (1,0) userId,name都不是唯一主键可以重复这样的也是不具备幂等性的。
幂等解决方案
token机制
服务端提供了发送 token 的接口。我们在分析业务的时候哪些业务是存在幂等问题的 就必须在执行业务前先去获取 token服务器会把 token 保存到 redis 中。然后调用业务接口请求时把 token 携带过去一般放在请求头部。服务器判断 token 是否存在 redis 中存在表示第一次请求然后删除 token继续执行业务。如果判断 token 不存在 redis 中就表示是重复操作直接返回重复标记给 client这样就保证了业务代码不被重复执行。
例如本项目就是thymleaf渲染页面前就给页面封装了一个防重令牌token这样就是前端一个令牌后端redis存了一个令牌。
危险性
先删除 token 还是后删除 token
(1) 先删除可能导致业务确实没有执行重试还带上之前 token由于防重设计导致 请求还是不能执行。(2) 后删除可能导致业务处理成功但是服务闪断出现超时没有删除 token别人继续重试导致业务被执行两边(3) 我们最好设计为先删除 token如果业务调用失败就重新获取 token 再次请求。
Token 获取、比较和删除必须是原子性
(1) redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子可能导 致高并发下都 get 到同样的数据判断都成功继续业务并发执行(2) 可以在 redis 使用 lua 脚本完成这个操作
if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end// 1. 验证令牌[令牌的对比和删除必须保证原子性]
// 0 令牌失败1删除成功
String script if redis.call(get,KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;
String orderToken vo.getOrderToken();
Long result redisTemplate.execute(new DefaultRedisScriptLong(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX memberRespVo.getId()), orderToken
);
// 原子验证令牌和删除令牌
if (result 0l) {// 令牌验证失败return response;
} else {// 令牌验证成功// 去创建订单验令牌验价格锁库存...
}各种锁机制
数据库悲观锁
悲观锁使用时一般伴随着事务一起使用数据锁定时间可能会很长需要根据实际情况选用。另外注意的时id字段一定是主键或者唯一索引不然可能造成锁表的结果处理起来会非常麻烦。
数据库乐观锁
该方法适合更新的场景中带版本号
update t_goods set count count 1 , version version 1
where good_id 2 and version 1根据version版本也就是再操作库存钱先获取到当前商品的version版本号然后操作的时候带上此version号。例如我们第一次操作库存时得到version为1调用库存服务version变成了2但返回给订单服务出现了问题订单服务又一次发起调用库存服务当订单服务传入的version还是1在执行上面的sql语句时就不会执行因为version已经变为了2了where条件就不成立。这样就保证了不管调用几次只会真正的处理一次。乐观锁主要使用于处理读多写少的问题。
业务层分布式锁
如果多个机器可能在同一时间同时处理相同的数据比如多台机器定时任务都拿到了相同数据处理我们就可以加分布式锁锁定此数据处理完成后释放锁。获取到锁的必须先判断这个数据是否背处理过。
各种唯一约束
数据库唯一约束
插入数据按照唯一索引来进行插入比如订单号这样相同的订单不可能有两条记录插入。
redis set 防重
很多数据需要处理只能被处理一次比如我们可以计算数据的MD5将其放入redis的set每次处理数据先看这个MD5是否已经存在存在就不处理。
防重表
使用订单号orderNo作为去重表的唯一索引把唯一索引插入去重表在进行业务操作且他们在同一事务中。这个保证了重复请求时因为去重表有唯一约束导致请求失败避免了幂等问题。这里注意的是去重表和业务表应该在同一库中这样就保证了在同一个事务即使业务操作失败了也会把去重表的数据回滚。这个很好的保证了数据一致性。
全局请求唯一id
调用接口时生成一个唯一idredis将数据保存到集合中去重存在即处理过可以使用nginx设置每一个请求的唯一id
proxy_set_header X-Request-Id $request_id;特别是 Feign服务触发重发请求拿着以前的老请求再重新发这样可以给这个请求设置一个全局唯一ID就算重复发了也能检测出来是否处理过。 Tips也适用于链路追踪 分布式事务
本地事务在分布式系统只能控制自己的回滚控制不了其他服务的回滚。分布式事务最大的问题就是网络问题 分布式机器。
本地事务
数据库事务的四个特性ACID
原子性atomicity一致性Consistency隔离性isolation持久性Durability
事务的隔离级别
read uncommitted 读未提交别的事务会读到其他未提交事务的数据问题脏读。read committed 读已提交一个事务可以读取另一个已提交的事务但多次读取会造成不一样的结果问题不可重复读问题。Oracle 和 SQL server 默认隔离级别。repeatable read 可重复读存在幻读问题。MySQL默认隔离级别。serializable 序列化等同于 串行 效率低。
事务的传播行为
一般都是required行为。 同一对象内事务方法互调默认失效原因 绕过了代理对象事务使用代理对象来控制的解决使用代理对象来调用事务方法。
引入aop-starter ; spring-boot-starter-aop; 引入了aspectjEnableAspectJAutoProxy(exposeProxy true)开启aspectj 动态代理功能。以后所有动态代理都是对外暴露代理对象。本类互调用调用对象
OrderServiceImpl orderService (OrderServiceImpl)AopContext.currentProxy();
orderService.b(); // 调用orderService对象的b方法
orderService.c(); // 调用orderService对象的c方法。分布式事务的 问题 以及 理论
CAP定理
CAP原则又称为CAP定理
Consistency 一致性在分布式系统中的所有数据备份在同一时刻是否同样的值。等同于所有节点访问同一份最新的数据副本Availability 可用性在集群中一部分节点故障后集群整体是否还能影响客户端的读写请求。对数据更新具备高可用性Partition tolerance 分区容错性大多数分布式系统分布在多个自网络。每个子网络就叫做一个区partition分区容错意思是区间通信可能失败比如一台服务器放在中国另一台服务器放在美国这就是两个区他们之间可能无法通信。
CAP原则指的是这三个要素最多只能同时实现两点不可能三者兼顾。
分布式系统里面分区容错肯定是要满足的然而 一致性 和 可用性 这二者是相互冲突的。
分布式系统中实现一致性的raft算法Raft 采用领导发布的效果原理还有paxos算法。 Tip总结一般市场上都是基于AP的无法保证c强一致性但是可以保证最终一致性。 BASE 理论
是对CAP理论的延伸思想是即使无法做到强一致性CAP的一致性就是强一致性但可以采用适当的采用弱一致性即最终一致性。BASE是指
Basically Available 基本可用是指分布式系统在出现故障的时候允许损失部分可用性例如响应时间、功能上的可用性允许损失部分可用性。需要注意的是基本可用绝不等价于系统不可用。Soft State 软状态软状态是指允许系统存在中间状态而该中间状态不会影响系统整体可用性。Eventual Consistency 最终一致性(弱一致性)最终一致性是指系统中的所有数据副本经过一定时间后最终能够达到一致的状态。
分布式事务的 多种方案
2PC模式
场景不适用于高并发适用于一般分布式事务该模式已经被取代延申。数据库支持的 2PC【2 phase commit 二阶提交】又叫做 XA Transactions。 MySQL 从 5.5 版本开始支持SQL Server 2005 开始支持Oracle 7 开始支持。 其中XA 是一个两阶段提交协议该协议分为以下两个阶段 第一阶段事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作并反映是 否可以提交. 第二阶段事务协调器要求每个数据库提交数据。 其中如果有任何一个数据库否决此次提交那么所有数据库都会被要求回滚它们在此事务 中的那部分信息。 后来出现了3PC模式了解即可。
柔性事务 - TCC 事务补偿型 方案常用
场景不推荐在高并发场景下也是常用的分布式事务解决。 刚性事务遵循 ACID 原则强一致性。 柔性事务遵循 BASE 理论最终一致性 与刚性事务不同柔性事务允许一定时间内不同节点的数据不一致但要求最终一致。 分三个阶段第一阶段 prepare 行为调用各个服务的 Try 逻辑。第二阶段 commit 行为调用各个服务的 Confirm 逻辑。第三阶段 rollback 行为有一个服务异常则进行回滚操作 调用各个服务的 Cancel 逻辑。此处所谓的 补偿 举个例子try逻辑是 某个数据-2 了那么Cancel 补偿逻辑里面就是 2 .
柔性事务 - 最大努力通知型方案常用
场景基于消息服务的适用高并发场景。按规律进行通知不保证数据一定能通知成功但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时比如调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现例如通过 MQ 发送 http 请求设置最大通知次数。达到通知次数后即不再通知。 案例银行通知、商户通知等各大交易业务平台间的商户通知多次通知、查询校对、对账文件支付宝的支付成功异步回调 就是不断的通知告诉你结果。
柔性事务 - 可靠消息 最终一致性方案常用
场景基于消息服务的适用高并发场景。实现业务处理服务在业务事务提交之前向实时消息服务请求发送消息实时消息服务只记录消息数据而不是真正的发送。业务处理服务在业务事务提交之后向实时消息服务确认发送。只有在得到确认发送指令后实时消息服务才会真正发送。 也是出现问题发送消息服务接受到消息后就进行回滚与上面那个相比多了消息这一步。
Seata 框架
Seata 介绍
官方地址https://seata.io/zh-cn/Seata是一款开源的分布式事务解决方案致力于提高高性能和简单易用的分布式事务服务。提供了多种模式AT、TCC、SAGA 和 XA事务模式。ATauto自动模式TCCTry、Confirm、Cancel
Seata很好理解首先弄明白下面三个名词TC事务协调者维护全局和分支。TM事务管理器处理全局事务的 开始、提交、回滚操作。所谓的全局事务是 谁是主业务发起的远程调用那么TM就在谁的上面。RM资源管理器维护分支事务。
Seata 环境搭建
创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表文档描述很清楚
给每个微服务对应的 数据库 都创建一个 UNDO_LOG 表。目的记录日志的状态确定是否回滚。 安装事务协调器seata-server。
根据指示来就行此处我们下载的1.0.0 GA 版本的seata。 在common-server公共服务中导入Seata相关依赖
!-- 分布式事务seata --
dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactId
/dependency查找一个seata-all的依赖查看版本确保与事务协调器版本一致。也可以去SpringCloud 官方去看对应版本。
本项目是 1.5.2 那么事务协调器对应的也是1.5.2版本 从github上面下载seata-server事务协调器先解压。相关配置文件解释
register.conf 注册相关配置
registry 配置注册中心type配置什么类型的注册中心。
config 配置配置中心type也是用的什么类型的配置中心默认file类型seata服务默认有个file.conf文件。也可以改成nacos。store 配置配置seata的存储方式。
本次采用file文件方式存储。
不同版本可能不太一样但是效果都差不多的
下面是1.5.2 seata 版本 配置好config、registry、store后进入bin启动seata项目。
并查看nacos是否注册成功。 所有想要用到分布式事务的客户端微服务都要适用seata DataSourceProxy代理自己的数据源。seata github上面有介绍
参考官方地址https://github.com/seata/seata-samples/tree/master/springcloud-jpa-seata package com.atguigu.gulimall.order.config;import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;import javax.sql.DataSource;/* author: xuyanbo* date: 2023/11/2 18:57*/
Configuration
public class MySeataConfig {// 数据源自带的配置属性AutowiredDataSourceProperties dataSourceProperties;Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties){// 根据源码创建数据源以及设置相关配置HikariDataSource dataSource dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if (StringUtils.hasText(dataSourceProperties.getName())){dataSource.setPoolName(dataSourceProperties.getName());}// 用seata的代理数据源来配置即可。return new DataSourceProxy(dataSource);}}每个客户端微服务配置seata的相关信息。
不同版本可能不太一样旧版是引入file.conf 和 registry.conf文件。1.5.2版本的seata是通过配置application.yml实现 Tipsseata.tx-service-group服务名 和 service.vgroup-mapping.服务名 的配置用来映射seata-server的识别。 spring:datasource:username: rootpassword: rooturl: jdbc:mysql://www.gulimall.com:3306/gulimall_omsdriver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:mapper-locations: classpath:/mapper//*.xmlglobal-config:db-config:id-type: auto # 主键自增
server:port: 9000# Seata 配置
seata:tx-service-group: gulimall-order #这里每个服务都是对应不同的映射名,在配置中心可以看到registry:type: nacosnacos:server-addr: localhost:8848group: DEFAULT_GROUPservice:vgroup-mapping:#这里也要注意 key为映射名,gulimall-order: default给主事务服务订单服务添加GlobalTransactional 和 Transactional注解。
子事务服务仓储服务添加 Transactional注解 即可。
GlobalTransactional
Transactional
Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {SubmitOrderResponseVo response new SubmitOrderResponseVo();confirmVoThreadLocal.set(vo);
...启动测试分布式事务。
Seata 的几个模式
Seata 默认是AT模式是2PC的演变属于补偿性质。官方案例都已经给出几种模式的案例
AT模式相当于自动解锁不适用于高并发的分布式事务仅仅适用于一般的分布式事务。要根据实际情况来应用属于自己的分布式事务。
订单服务采用 最终一致性方案 解决分布式事务问题
因为订单服务属于高并发服务使用其他分布式方案可能会出现严重问题。因此考虑使用 可靠消息 最终一致性方案 进而保证高并发。
RabbitMQ 延时队列实现定时任务
场景比如未付款订单超过一定时间后系统自动取消订单并释放占有物品。
旧版本Spring的schedule定时任务轮询数据库缺点消耗系统内存、增加了数据库的压力、存在较大的时间误差。
新版本 RabbitMQ版本解决rabbitmq的消息TTL和死信Exchange结合。
消息的TTLTime To Live就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间也可以对每一个单独的消息做单独的设置。超过了这个时间我们认为这个消息就死了称之为死信。如果队列设置了消息也设置了那么会取小的。所以一个消息如果被路由到不同的队列中这个消息死亡的事件有可能不一样不同的队列设置。这里单个消息的TTL因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间两者是一样的效果。
理解DLXDead Letter Exchanges什么是死信(消息)
一个消息被Consumer拒收了并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里被其他消费者使用。basic.reject/basic.nackrequeuefalse上面的消息的TTL到了消息过期了。队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
效果图
涉及两种消息设置过期时间 和 队列设置过期时间。 Tips推荐使用的是队列设置过期时间。因为在消息设置过期时间中rabbitmq采用惰性检查机制。例如第一个消息5分钟过期第二个消息1分钟过期那么第二个消息就必须等第一个消息过期了才能被检测到。 延迟队列的 设计和实现
基本上是每一个微服务对应一个交换机就够了交换机命名方式如服务名-事件-exchange 对应的一系列服务。业务流程图画设计流程图与业务流程图一个效果 Tips注意交换机是被复用了不要严格定义什么死信交换机之类的。其实都是普通队列和交换机只不过复用出了不同效果而已。 SpringCloud集成RabbitMQ了相关内容直接通过Bean进行注入创建即可
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* author: xuyanbo* date: 2023/11/3 14:13*/
Configuration
public class MyMQConfig {/* 通过Bean方式将Binding、Queue、Exchange 自动创建RabbitMQ对应的交换机、队列、绑定等。*/// 创建死信队列Beanpublic Queue orderDelayQueue(){MapString, Object arguments new HashMap();/* x-dead-letter-exchange 绑定死信交换机* x-dead-letter-routing-key 绑定死信路由* x-message-ttl 绑定过期时间*/arguments.put(x-dead-letter-exchange,order-event-exchange);arguments.put(x-dead-letter-routing-key,order.release.order);arguments.put(x-message-ttl,60000);Queue queue new Queue(order.delay.queue,true,false,false,arguments);return queue;}// 普通队列Beanpublic Queue orderReleaseOrderQueue(){Queue queue new Queue(order.release.order.queue,true,false,false);return queue;}// 声明交换机根据路由复用了区分好路由和绑定关系即可Beanpublic Exchange orderEventExchange(){TopicExchange topicExchange new TopicExchange(order-event-exchange,true,false);return topicExchange;}// 声明绑定死信队列关系Beanpublic Binding orderCreateOrderBingding(){return new Binding(order.delay.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.create.order,null);}// 声明绑定正常消费队列关系Beanpublic Binding orderReleaseOrderBingding(){return new Binding(order.release.order.queue,Binding.DestinationType.QUEUE,order-event-exchange,order.release.order,null);}}TipsRabbitMQ中已经创建了Binding、Queue、Exchange 在重新启动微服务执行Bean时并不会重新创建也不会修改属性之类的。解决办法手动删除即可。 这样就可以测试一下延迟队列的效果
// 1. 随便写个接口
Autowired
RabbitTemplate rabbitTemplate;GetMapping(/test/orderCreate)
ResponseBody
public String createOrderTest(){// 订单下单成功OrderEntity entity new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());rabbitTemplate.convertAndSend(order-event-exchange,order.create.order,entity);return ok;
}
// 2. 写个监听器测试是否成功
RabbitListener(queues order.release.order.queue)
public void listener(OrderEntity entity, Channel channel , Message message) throws IOException {System.out.println(收到的过期的订单信息准备关闭订单 entity.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}锁和解锁库存 架构实现
业务流程图如下
引入rabbitmq依赖。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependencyrabbitmq相关配置
spring.rabbitmq.hostwww.gulimall.com
spring.rabbitmq.virtual-host/添加 EnableRabbit 启动类添加rabbitmq的序列化机制转换
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {// 使用JSON序列化机制进行消息转换Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}给库存服务添加一系列的交换机、队列、绑定。
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* author: xuyanbo* description: TODO* date: 2023/10/27 14:43*/
Configuration
public class MyRabbitConfig {// 使用JSON序列化机制进行消息转换Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}// fixme 这个监听的作用就是为了触发一下rabbitmq触发成功就会创建这些交换机或者绑定之类的。RabbitListener(queues stock.release.stock.queue)public void handler(){}// 库存服务的默认交换机Beanpublic Exchange stockEventExchange(){return new TopicExchange(stock-event-exchange,true,false);}// 普通队列Beanpublic Queue stockReleaseStockQueue(){return new Queue(stock.release.stock.queue,true,false,false);}// 延迟队列死信队列Beanpublic Queue stockDelayQueue(){MapString, Object arguments new HashMap();/* x-dead-letter-exchange 绑定死信交换机* x-dead-letter-routing-key 绑定死信路由* x-message-ttl 绑定过期时间*/arguments.put(x-dead-letter-exchange,stock-event-exchange);arguments.put(x-dead-letter-routing-key,stock.release);// 2分钟arguments.put(x-message-ttl,120000);return new Queue(stock.delay.queue,true,false,false,arguments);}// 正常队列的绑定关系Beanpublic Binding stockReleaseBinding(){return new Binding(stock.release.stock.queue,Binding.DestinationType.QUEUE,stock-event-exchange,stock.release.#,null);}// 死信队列的绑定关系Beanpublic Binding stockLockedBinding(){return new Binding(stock.delay.queue,Binding.DestinationType.QUEUE,stock-event-exchange,stock.locked,null);}} Tips记得添加一个监听方法不然没办法触发一下rabbitmq创建这些交换机或者绑定之类的。 启动服务查看创建。 订单服务 库存解锁 场景
库存解锁的场景
下订单成功订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存。下订单成功库存锁定成功接下来的业务调用失败导致订单回滚。之前锁定的库存就要自动解锁。
数据结构图
查询数据库关于这个订单的锁定库存信息。两种情况
有证明库存锁定成功了。 解锁就要看订单情况。
没有这个订单。必须解锁。有这个订单。就要看订单状态。订单已取消解锁库存。 没取消不能解锁。
没有库存锁定失败了库存回滚了。这种情况无需解锁。
还有重要一点Rabbitmq监听必须设置为手动ack模式只要解锁库存的消息失败。一定告诉服务解锁失败一定要启动手动ack模式,
# 配置spring.rabbitmq.listener.simple.acknowledge-modemanual
# 代码channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);案例代码
一定要使用这种try - catch 方式来手动回复简洁又方便。这样抛出异常等同于消息没有处理成功进行拒绝并重新放回队列。没有抛出异常等同于消息处理成功ack手动返回true
package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.mq.StockLockedTo;
import com.atguigu.gulimall.ware.service.WareSkuService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;/* author: xuyanbo* description: TODO* date: 2023/11/4 10:36*/
Service
RabbitListener(queues stock.release.stock.queue)
public class StockReleaseListener {AutowiredWareSkuService wareSkuService;/* 1. 库存自动解锁* 下订单成功库存锁定成功。接下来的业务调用失败导致订单回滚。之前锁定的库存就要自动解锁。* 2. 订单失败。* 锁库存失败** 只要解锁库存的消息失败。一定告诉服务解锁失败一定要启动手动ack模式, channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);。* param to* param message*/RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message , Channel channel) throws IOException {// 这样抛出异常等同于消息没有处理成功进行拒绝并重新放回队列。// 没有抛出异常等同于消息处理成功ack手动返回truetry {System.out.println(收到解决库存的消息);wareSkuService.unlockStock(to);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}/* 解锁* 1. 查询数据库关于这个订单的锁定库存信息。* 两种情况* 有证明库存锁定成功了。* 解锁看订单情况。* 1. 没有这个订单。必须解锁。* 2. 有这个订单。就要看订单状态。* 已取消解锁库存。* 没取消不能解锁。* 没有库存锁定失败了库存回滚了。这种情况无需解锁。*/
Override
public void unlockStock(StockLockedTo to) {StockDetailTo detail to.getDetail();Long detailId detail.getId();WareOrderTaskDetailEntity byId orderTaskDetailService.getById(detailId);if (byId ! null) {// 解锁Long id to.getId();WareOrderTaskEntity taskEntity orderTaskService.getById(id);String orderSn taskEntity.getOrderSn(); // 根据订单号查询订单状态R r orderFeignService.getOrderStatus(orderSn);if (r.getCode() 0) {// 订单数据返回成功OrderVo data r.getData(new TypeReferenceOrderVo() {});// 订单不存在 或者 4订单已经取消 都要解锁库存if (data null || data.getStatus() 4) {// 订单已经被取消了。才能解锁库存if (byId.getLockStatus() 1) {unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);} }} else {// 消息拒接以后重新放到队列里面让别人继续消息解锁throw new RuntimeException(远程服务失败);}} else {// 无需解锁}
}private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {// 库存解锁wareSkuDao.unlockStock(skuId, wareId, num);// update wms_ware_sku set stock_locked stock_locked - #{num}// where sku_id #{skuId} and ware_id #{wareId}// 更新库存工作单的状态WareOrderTaskDetailEntity entity new WareOrderTaskDetailEntity();entity.setId(taskDetailId);entity.setLockStatus(2); // 变为已解锁orderTaskDetailService.updateById(entity);
}订单服务 定时关单 场景
架构图其实就是设置一个延迟队列即可。
注意订单关闭了必须向库存服务发送一个消息解锁库存。
整体流程也是通过rabbitmq监听器 等信息实现
注意此处用到了两个RabbitHandler注解来区分了哪个是订单到时后的主动解锁哪个是锁库存之后的解锁。 RabbitMQ 消息积压、丢失、重复解决方案
如何保证消息可靠性 - 消息丢失
消息丢失场景 一消息发送出去由于网络原因没有抵达服务器。
解决办法
做好容错方法try - catch发送消息可能会网络失败失败后要有重试机制可记录到数据库采用定期扫描重发的方式。
// 发给MQ一个
try {// TODO 保证消息一定会发送出去每一个消息都可以做好日志记录。给数据库保存每一个消息的详细信息。// TODO 定期扫描数据库将失败的消息再发送一遍。// 执行这个方法的时候执行完了网络延迟或者失败了。就要抛出异常走catch。rabbitTemplate.convertAndSend(order-event-exchange,order.release.other,orderTo);
} catch (Exception e) {// TODO 将没发送成功的消息进行重试发送。...
}做好日志记录每个消息状态是否都被服务器收到都应该记录。
可以设置一个MQ消息表将每一个消息保存下来定期扫描数据库将失败的消息再发送一遍
CREATE TABLE mq_message (message_id char(32) NOT NULL,content text,to_exchane varchar(255) DEFAULT NULL,routing_key varchar(255) DEFAULT NULL,class_type varchar(255) DEFAULT NULL,message_status int(1) DEFAULT 0 COMMENT 0-新建 1-已发送 2-错误抵达 3-已抵达,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,
PRIMARY KEY (message_id)
) ENGINEInnoDB AUTO_INCREMENT13 DEFAULT CHARSETutf8mb4 COMMENTMQ消息表;做好定期重发如果消息没有发送成功定期去数据库扫描未成功的消息进行重发。
消息丢失场景 二消息抵达BrokerBroker要将消息写入磁盘持久化才算成功。此时Broker尚未持久化完成宕机。
解决方式publisher也必须加入确认回调机制通过生成者和消费者的确认机制解决该问题确认成功的消息修改数据库消息状态。
消息丢失场景 三自动ACK的状态下。消费者收到消息但没来得及处理消息服务器宕机了。
解决方式一定开启手动ACK消费成功才移除失败或者没来的及处理就noAck并重新入队。
如何保证消息可靠性 - 消息重复
消息重复场景
消息消费成功事务已经提交ack时机器宕机。导致没有ack成功Broker的消息重新由unack变为ready并发送给其他消费者。消息消费事变 由于重试机制自动又将消息发送出去。成功消费ack时宕机消息由unack变为readyBroker又重新发送。
解决方法
消费者的业务消费接口应该设置为幂等性的。比如扣库存有工作单的状态标志。使用防重表redis/mysql发送消息每一个都有业务的唯一标识处理过就不用处理过了。RabbitMQ的每一个消息都有redelivered字段可以获取是否是被重新投递过来的而不是第一次投递过来的。 如何保证消息可靠性 - 消息积压
消费积压场景
消费者宕机积压。消费者消费能力不足积压。发送者发送流量太大。
解决方式
上线更多的消费者进行正常消费。上线专门的队列消费服务将消息先批量取出来记录数据库离线慢慢处理。