app小游戏开发公司,seo博客模板,a站app,品牌网站建设的关键要点MQ事务消息使用场景
消息队列中的“事务”#xff0c;主要解决的是消息生产者和消息消费者的数据一致性问题。 拿我们熟悉的电商来举个例子。一般来说#xff0c;用户在电商 APP 上购物时#xff0c;先把商品加到购物车里#xff0c;然后几件商品一起下单#xff0c;最后…MQ事务消息使用场景
消息队列中的“事务”主要解决的是消息生产者和消息消费者的数据一致性问题。 拿我们熟悉的电商来举个例子。一般来说用户在电商 APP 上购物时先把商品加到购物车里然后几件商品一起下单最后支付完成购物流程就可以愉快地等待收货了。 这个过程中有一个需要用到消息队列的步骤订单系统创建订单后发消息给购物车系统将已下单的商品从购物车中删除。因为从购物车删除已下单商品这个步骤并不是用户下单支付这个主要流程中必需的步骤使用消息队列来异步清理购物车是更加合理的设计。 对于订单系统来说它创建订单的过程中实际上执行了 2 个步骤的操作 1 在订单库中插入一条订单数据创建订单 2 发消息给消息队列消息的内容就是刚刚创建的订单。
购物车系统订阅相应的主题接收订单创建的消息然后清理购物车在购物车中删除订单中的商品。
在分布式系统中上面提到的这些步骤任何一个步骤都有可能失败如果不做任何处理那就有可能出现订单数据与购物车数据不一致的情况比如说 创建了订单没有清理购物车 订单没创建成功购物车里面的商品却被清掉了。
那我们需要解决的问题可以总结为在上述任意步骤都有可能失败的情况下还要保证订单库和购物车库这两个库的数据一致性。
对于购物车系统收到订单创建成功消息清理购物车这个操作来说失败的处理比较简单只要成功执行购物车清理后再提交消费确认即可如果失败由于没有提交消费确认消息队列会自动重试。
问题的关键点集中在订单系统创建订单和发送消息这两个步骤要么都操作成功要么都操作失败不允许一个成功而另一个失败的情况出现。 这就是事务需要解决的问题。
消息队列是如何实现分布式事务的
事务消息需要消息队列提供相应的功能才能实现Kafka 和 RocketMQ 都提供了事务相关功能。
回到订单和购物车这个例子。 首先订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”这个半消息不是说消息内容不完整它包含的内容就是完整的消息内容半消息和普通消息的唯一区别是在事务提交之前对于消费者来说这个消息是不可见的。
半消息发送成功后订单系统就可以执行本地事务了在订单库中创建一条订单记录并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功那就提交事务消息购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败那就回滚事务消息购物车系统就不会收到这条消息。这样就基本实现了“要么都成功要么都失败”的一致性要求。
这个实现过程中有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办对于这个问题Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
Kafka 的解决方案比较简单粗暴直接抛出异常让用户自行处理。我们可以在业务代码中反复重试提交直到提交成功或者删除之前创建的订单进行补偿。RocketMQ 则给出了另外一种解决方案
RocketMQ 中的分布式事务实现
在 RocketMQ 中的事务实现中增加了事务反查的机制来解决事务消息提交失败的问题。如果Producer 也就是订单系统在提交或者回滚事务消息时发生网络异常RocketMQ的 Broker 没有收到提交或者回滚的请求Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制我们的业务代码需要实现一个反查本地事务状态的接口告知RocketMQ 本地事务是成功还是失败。
在我们这个例子中反查本地事务的逻辑也很简单我们只要根据消息中的订单 ID在订单库中查询这个订单是否存在即可如果订单存在则返回成功否则返回失败。 RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。
这个反查本地事务的实现并不依赖消息的发送方也就是订单服务的某个实例节点上的任何数据。这种情况下即使是发送事务消息的那个订单服务节点宕机了RocketMQ 依然可以通过其他订单服务的节点来执行反查确保事务的完整性。 综合上面讲的通用事务消息的实现和 RocketMQ 的事务反查机制使用 RocketMQ 事务消息功能实现分布式事务的流程如下图
很多人常有问题为什么不等待订单创建成功再向消息队列发送订单数据呢这样也不用考虑订单创建失败而发送消息的情况了。
如果先创建订单当前服务由于不可抗拒因素不能正常工作了没有给购物车系统发送消息这种情况加就会出现订单已经创建并且购物车没有清空的情况。 如果是发送半消息这种情况可以通过定期查询事务的状态然后根据然后具体的业务回滚操作或者重 新发送消息保证数据的最终一致性。
代码实现和底层代码原理RocketMQ源码分析 9.3 9.4 9.5