在线免费建网站,盐城网站建设优化建站,网站如何搭建,外贸app网站开发概叙
本文探讨 RocketMQ 的事务消息原理#xff0c;并从源码角度进行分析#xff0c;以及事务消息适合什么场景#xff0c;使用事务消息需要注意哪些事项。
同时详细介绍RocketMQ 事务消息的基本流程#xff0c;并通过源码分析揭示了其内部实现原理#xff0c;尽管事务消…概叙
本文探讨 RocketMQ 的事务消息原理并从源码角度进行分析以及事务消息适合什么场景使用事务消息需要注意哪些事项。
同时详细介绍RocketMQ 事务消息的基本流程并通过源码分析揭示了其内部实现原理尽管事务消息增加了系统的复杂性但在需要保证消息一致性的场景中它仍然是一种非常有效的解决方案比如资金转账、订单处理、分布式事务、库存管理等场景。
什么是事务消息
事务消息是为了保证分布式系统中消息的一致性而引入的一种消息类型。事务消息允许消息发送方在发送消息后进行本地事务操作并根据本地事务的执行结果来决定消息的最终状态提交或回滚。
RocketMQ 事务消息的优缺点
优点 保证消息一致性通过事务消息RocketMQ 能够保证分布式系统中消息的一致性避免数据不一致问题。 高性能RocketMQ 的事务消息性能较高能够满足高并发场景的需求。 易用性RocketMQ 提供了简单易用的 API使得开发者能够方便地使用事务消息。
缺点 复杂性事务消息的引入增加了系统的复杂性开发者需要处理事务状态回查等问题。 时延事务消息的处理涉及half消息、回查等操作可能会增加消息的时延。
事务消息适用场景
资金转账
在金融系统中资金转账需要确保资金的一致性和安全性。例如从账户 A 转账到账户 B必须确保 A 的金额减少和 B 的金额增加是一个原子操作。使用事务消息可以保证在转账过程中如果任何一个步骤失败整个操作都会回滚确保数据一致性。
订单处理
在电子商务系统中订单处理通常涉及多个步骤例如创建订单、扣减库存、生成支付记录等。这些步骤需要保证一致性。使用事务消息可以确保如果某一步操作失败整个订单处理过程可以回滚避免数据不一致。
分布式事务
在微服务架构中分布式事务是一个常见的挑战。多个微服务之间的操作需要协调一致事务消息可以作为一种分布式事务解决方案确保各个微服务之间的数据一致性。
库存管理
在库存管理系统中库存的增减操作需要保证一致性。例如用户下单后需要扣减库存使用事务消息可以确保在扣减库存失败时订单状态不会被错误更新。
事务消息注意事项
确保本地事务的幂等性
在分布式系统中本地事务操作可能会被多次执行。例如在事务状态回查时Broker 可能会多次检查本地事务状态。因此确保本地事务操作的幂等性非常重要。幂等性可以确保多次执行相同的操作不会产生副作用。
设置合理的超时时间
事务消息的处理涉及half消息、提交或回滚请求以及事务状态回查。设置合理的超时时间可以避免长时间等待影响系统性能。超时时间应根据实际业务需求和系统性能进行调整。
处理事务状态回查
事务状态回查是事务消息的重要机制。当 Broker 在规定时间内没有收到提交或回滚请求时会主动发起事务状态回查。开发者需要实现 TransactionCheckListener 接口并在 checkLocalTransactionState 方法中处理回查逻辑确保能够正确返回事务状态。
监控和日志
监控和日志是确保事务消息系统稳定运行的重要手段。通过监控可以及时发现系统中的异常情况例如事务状态回查失败、消息发送失败等。日志记录可以帮助开发者排查问题分析系统性能。
资源隔离
在使用事务消息时确保事务消息与其他普通消息的资源隔离以避免相互影响。例如可以为事务消息单独配置 Topic 和队列确保事务消息的处理不受其他消息影响。
事务消息的重试机制
在某些情况下事务消息的提交或回滚请求可能会失败。开发者需要考虑实现重试机制以确保最终能够正确提交或回滚事务消息。重试机制可以通过定时任务或消息队列实现。
性能影响
事务消息的处理涉及多次网络通信和状态检查可能会对系统性能产生一定影响。在高并发场景中需要评估事务消息对系统性能的影响并进行相应的优化。例如可以通过批量处理、异步处理等方式提高性能。 RocketMQ事务消息Transactional Message
RocketMQ事务消息Transactional Message是指应用本地事务和发送消息操作可以被定义到全局事务中要么同时成功要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能通过事务消息能达到分布式事务的最终一致。 RocketMQ 事务消息的基本流程
RocketMQ 的事务消息是指在消息发送方发送消息后需要经过两阶段提交来确保消息的可靠性传递和处理。 Producer 发送 half 消息Broker 先把消息写入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的队列之后给 Producer 返回成功Producer 执行本地事务成功后给 Broker 发送 commit 命令本地事务执行失败则发送 rollbackBroker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topicConsumer 拉取消息进行消费。
代码如下
public class ProducerTransactionListenerImpl implements TransactionListener {Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {/*** 这里执行本地事务执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回* LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW* Broker会回来查询所以需要记录事务执行状态*/return LocalTransactionState.COMMIT_MESSAGE;}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {/*** 这里查询事务执行状态根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或* LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW* Broker会再次查询可以记录查询次数超过次数后返回ROLLBACK_MESSAGE*/return LocalTransactionState.UNKNOW;}
}维度 8消息索引
我们知道RocketMQ 核心的数据文件有 3 个CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一个索引文件结构如下图 查找消息时首先根据消息 key 的 hashcode 计算出 Hash 槽的位置然后读取 Hash 槽的值计算 Index 条目的位置从Index 条目位置读取到消息在 CommitLog 文件中的 offset从而查找到消息。
在 Producer 发送消息时可以指定一个 key代码如下
Message sendMessage new Message(topic1, tag1, message.getBytes());
sendMessage.setKeys(weiyiid);这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。
RocketMQ 的事务消息流程大致可以分为以下几个阶段 发送half消息生产者首先发送一条 half消息 到 RocketMQ Broker。half消息是指消息已经发送到 Broker但此时消息状态不确定该消息对消费者不可见。 执行本地事务生产者在发送half消息之后立即执行本地事务操作。例如更新数据库、调用外部服务等。 提交或回滚事务消息本地事务操作完成后生产者根据本地事务的执行结果向 Broker 发送 提交 或 回滚 请求。如果本地事务执行成功则发送 提交 请求使得half消息变为可消费的正式消息如果本地事务失败则发送 回滚 请求Broker 将删除该half消息。 事务状态回查如果在规定时间内 Broker 没有收到提交或回滚请求Broker 会主动向消息发送方发起事务状态回查以确认该消息的最终状态。 Apache RocketMQ在4.3.0版中已经支持分布式事务消息采用了2PC两阶段提交 补偿机制事务状态回查的思想来实现了提交事务消息同时增加一个补偿逻辑来处理二阶段超时或者失败的消息如上图所示。 RocketMQ 事务消息的源码分析
下面给出一个完整事务消息发送示例
public class TransactionProducer {public static void main(String[] args) throws Exception {// 创建事务消息生产者TransactionMQProducer producer new TransactionMQProducer(TransactionProducerGroup);producer.setNamesrvAddr(localhost:9876);// 设置事务状态回查监听器producer.setTransactionCheckListener(new TransactionCheckListener() {Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {// 处理事务状态回查逻辑System.out.println(Checking transaction state for message: new String(msg.getBody()));return LocalTransactionState.COMMIT_MESSAGE;}});// 启动生产者producer.start();// 发送事务消息Message msg new Message(TransactionTopic, TagA, Transaction Message.getBytes());SendResult sendResult producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {Overridepublic LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {// 执行本地事务逻辑System.out.println(Executing local transaction for message: new String(msg.getBody()));// 假设本地事务执行成功返回 COMMIT_MESSAGE// 如果本地事务失败返回 ROLLBACK_MESSAGEreturn LocalTransactionState.COMMIT_MESSAGE;}}, null);System.out.println(Send result: sendResult);// 阻塞主线程防止退出System.in.read();// 关闭生产者producer.shutdown();}
}客户端的事务消息处理
发送half消息
发送half消息的核心代码在 TransactionMQProducer 类中通过 sendMessageInTransaction 方法实现
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) {// 1. 发送half消息SendResult sendResult this.defaultMQProducerImpl.send(msg);// 2. 执行本地事务LocalTransactionState localTransactionState tranExecuter.executeLocalTransactionBranch(msg, arg);// 3. 根据本地事务状态提交或回滚消息this.endTransaction(msg, localTransactionState);return new TransactionSendResult(sendResult, localTransactionState);
}在 sendMessageInTransaction 方法中首先调用 send 方法发送half消息然后执行本地事务并根据本地事务的结果调用 endTransaction 方法提交或回滚消息。
执行本地事务
本地事务的执行由 LocalTransactionExecuter 接口的实现类来完成。在实际使用中用户需要实现该接口并在 executeLocalTransactionBranch 方法中定义具体的本地事务逻辑。
public interface LocalTransactionExecuter {LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}提交或回滚事务消息
提交或回滚事务消息的实现也在 TransactionMQProducer 类中通过 endTransaction 方法完成
private void endTransaction(Message msg, LocalTransactionState localTransactionState) {// 构建事务结束请求EndTransactionRequestHeader requestHeader new EndTransactionRequestHeader();requestHeader.setCommitOrRollback(localTransactionState LocalTransactionState.COMMIT_MESSAGE ? 0 : 1);requestHeader.setTranStateTableOffset(msg.getQueueOffset());requestHeader.setCommitLogOffset(msg.getCommitLogOffset());// 发送事务结束请求到 Brokerthis.defaultMQProducerImpl.endTransaction(msg, requestHeader);
}在 endTransaction 方法中根据本地事务的执行结果构建事务结束请求并调用 endTransaction 方法将请求发送到 Broker。
事务状态回查
事务状态回查是由 Broker 发起的。当 Broker 在规定时间内没有收到提交或回滚请求时会主动向消息发送方发起事务状态回查。回查的实现主要在 TransactionCheckListener 接口中
public interface TransactionCheckListener {LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}消息发送方需要实现 TransactionCheckListener 接口并在 checkLocalTransactionState 方法中定义如何检查本地事务的状态。
Broker 端的事务消息处理
Broker 端的事务消息处理主要在 TransactionalMessageServiceImpl 类中实现。Broker 负责接收half消息、提交或回滚请求并在必要时发起事务状态回查。
接收half消息
Broker 接收half消息的逻辑在 TransactionalMessageServiceImpl 类的 prepareMessage 方法中
public PutMessageResult prepareMessage(MessageExtBrokerInner msgInner) {// 存储half消息return this.store.putMessage(msgInner);
}提交或回滚消息
Broker 处理提交或回滚请求的逻辑在 TransactionalMessageServiceImpl 类的 commitMessage 和 rollbackMessage 方法中
public boolean commitMessage(MessageExt msgExt) {// 提交消息return this.store.commitTransaction(msgExt);
}public boolean rollbackMessage(MessageExt msgExt) {// 回滚消息return this.store.rollbackTransaction(msgExt);
}事务状态回查
Broker 发起事务状态回查的逻辑在 TransactionalMessageServiceImpl 类的 check 方法中
public void check(long transactionTimeout, int transactionCheckMax, String topic) {// 遍历half消息队列发起事务状态回查ListMessageExt halfMessages this.store.getHalfMessages(topic);for (MessageExt msg : halfMessages) {// 发起回查请求this.brokerController.getBroker2Client().checkProducerTransactionState(msg);}
}RocketMQ分布式事务原理
分布式事务应用场景
随着应用的拆分从单体架构变成分布式架构那么每个服务或者模块也会有自己的数据库。一个业务流程的完成需要经过多次的接口调用或者多条MQ消息的发送。 那基于上面的应用场景应该如何设计发送消息的流程才能让这两个操作要么都成功要么都失败呢其实可以参照XA两阶段提交的思想把发送消息分成两步然后把操作本地数据库也包括在这个流程中。 那么在介绍原理之前先科普一下两个新的概念 1、半消息Half Message也就是暂不能投递消费者的消息。发送方已经将消息成功发送到了 MQ 服务端但是服务端未收到生产者对这条消息的二次确认这个时候这条消息会被标记为“暂不能投递”状态。 2、消息回查Message Status Check由于网络闪断、生产者应用重启等原因导致某条事务消息的 二次确认丢失MQ 服务端通过扫描发现某条消息长期处于“半消息”时需要主动向消息生产者询问该消息的最终状态要么是Commit要么Rollback。 如图所示一共分为七个步骤 第一步生产者向 MQ 服务端发送消息。 第二步MQ 服务端将消息持久化成功之后向发送方 ACK 确认消息已经发送成功此时消息为半消息。 第三步发送方开始执行本地数据库事务逻辑。 第四步发送方根据本地数据库事务执行结果向 MQ Server 提交二次确认MQ Server 收到 Commit状态则将半消息标记为可投递订阅方最终将收到该消息MQ Server 收到 Rollback 状态则删除半消息订阅方将不会接受该消息。 第五步在断网或者是应用重启的特殊情况下按步骤4提交的二次确认最终未到达 MQ Server经过固定时间后 MQ Server 将对该消息发起消息回查。 第六步发送方收到消息回查后需要检查对应消息的本地事务执行的最终结果。 第七步发送方根据检查得到的本地事务的最终状态再次提交二次确认MQ Server 仍按照步骤4对半消息进行操作Commit/Rollback。
RocketMQ事务消息使用限制 使用事务消息有一些限制条件
事务消息不支持延时消息和批量消息事务性消息可能不止一次被检查或消费所以消费者端需要做好消费幂等
为了避免单个消息被检查太多次而导致半队列消息累积我们默认将单个消息的检查次数限制为 15 次即默认只会回查15次
我们可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。
如果已经检查某条消息超过 N 次的话 N transactionCheckMax 则 Broker 将丢弃此消息并在默认情况下同时打印错误日志。
用户可以通过重写AbstractTransactionCheckListener 类来修改这个行为
事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。
当发送事务消息时用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制该参数优先于 transactionMsgTimeout 参数
提交给用户的目标主题消息可能会失败目前这依日志的记录而定。
它的高可用性通过 RocketMQ 本身的高可用性机制来保证如果希望确保事务消息不丢失、并且事务完整性得到保证建议使用同步的双重写入机制。 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。1.事务消息执行时间限制RocketMQ 要求事务消息的本地事务执行器TransactionListener在规定的时间内完成并返回事务执行结果否则可能会触发回查机制。2.回查机制的限制如果发送方应用程序长时间未返回事务执行结果RocketMQ 服务端会触发回查机制这可能会增加系统的负担和网络开销。3.不支持跨集群事务消息RocketMQ 不支持跨集群的事务消息即发送方和消费方需要处于同一个 RocketMQ 集群中。4.事务消息的可靠性和性能权衡由于事务消息需要经过两阶段提交相比普通消息可能存在一定的性能损耗。5.需要依赖本地事务执行器发送方应用程序需要自行实现和注册本地事务执行器确保本地事务的正确执行和结果反馈。
总的来说RocketMQ 事务消息在确保消息可靠传递的同时也需要开发者按照一定的规范来设计和实现本地事务执行器以及处理可能的回查请求这些都是在使用 RocketMQ 事务消息时需要考虑和遵循的限制。
RocketMQ事务消息怎么实现
在 RocketMQ 中实现事务消息可以通过使用事务生产者Transaction Producer来完成。下面是一个简单的示例代码演示了如何在 RocketMQ 中实现事务消息
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class TransactionProducer {public static void main(String[] args) throws Exception {TransactionMQProducer producer new TransactionMQProducer(transaction_producer_group);producer.setNamesrvAddr(your_namesrv_address);// 定义事务监听器TransactionListener transactionListener new TransactionListenerImpl();producer.setTransactionListener(transactionListener);// 定义线程池来处理事务消息的预备、提交和回查ThreadPoolExecutor executor new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,new ArrayBlockingQueue(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});producer.setExecutorService(executor);producer.start();// 发送事务消息Message message new Message(YourTopic, YourTag, YourKeys, YourMsg.getBytes());producer.sendMessageInTransaction(message, null);// 关闭生产者producer.shutdown();}
}class TransactionListenerImpl implements TransactionListener {Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 在此处执行本地事务根据执行结果返回不同的状态return LocalTransactionState.COMMIT_MESSAGE; // or ROLLBACK_MESSAGE or UNKNOW}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 在此处检查本地事务的状态并返回相应的状态return LocalTransactionState.COMMIT_MESSAGE; // or ROLLBACK_MESSAGE or UNKNOW}
}以上代码中我们创建了一个事务生产者 TransactionMQProducer并设置了事务监听器 TransactionListener。
在事务监听器的实现中我们需要实现 executeLocalTransaction 方法来执行本地事务以及 checkLocalTransaction 方法来检查本地事务的状态。
在 executeLocalTransaction 中根据本地事务的执行结果返回不同的状态而在 checkLocalTransaction 中根据本地事务的状态返回相应的状态。 使用事务消息时需要确保消息发送的可靠性以及本地事务的正确执行和状态的正确返回。在实际场景中还需要根据业务逻辑来合理处理事务消息的执行和状态回查。