网站开发案例图片,车险保险网站,广西网络推广公司哪家好,沈阳网站建设模块目录 0. 引言1. 原理2. 事务消息的实现2.1 java client实现#xff08;适用于spring框架#xff09;2.2 springboot实现 3. 总结 0. 引言
rocketmq 的一大特性就是支持事务性消息#xff0c;这在诸多场景中有所应用。在之前的文章中我们已经讲解过事务消息的使用#xff0… 目录 0. 引言1. 原理2. 事务消息的实现2.1 java client实现适用于spring框架2.2 springboot实现 3. 总结 0. 引言
rocketmq 的一大特性就是支持事务性消息这在诸多场景中有所应用。在之前的文章中我们已经讲解过事务消息的使用但事务消息是如何实现的呢 今天我们就来探究其原理
1. 原理
rocketmq事务消息的基本流程分为三步 1发送 half 消息到 broker。 2执行本地事务。 3根据本地事务执行结果本地事务执行成功则将half消息标识为COMMIT将消息进行提交变成普通消息可以被消费者消费本地事务执行失败将half消息标识为ROLLBACK并将消息从消息日志中删除消费者则消费不到了。
从流程上可以看出rocketmq的事务消息和数据库的事务有些类型都有一个二阶段提交的概念。
rocketmq的half message半消息是一种特殊的消息该种消息不会被消费者消费但是可以被TransactionListener事务监听器获取到。
而我们就在事务监听器中书写自己的本地事务逻辑本地事务执行成功后才将消息正常提交这时提交后的消息才能被消费者消费到否则就会回滚消息消息就相当于从来没发送过。
这里需要注意的是因为我们的本地事务逻辑已经在半消息接收的时候处理掉了所以如果后续没有消费逻辑了就不用再在消费者里书写逻辑但如果还有后续的逻辑就可以按照正常消费流程进行书写。
但是还有一个场景我们需要考虑当本地事务执行完成后在返回“执行成功提交”或“执行失败回滚”状态时因为网络波动或者broker服务挂了导致broker没有正常收到这个状态从而无法及时把half message进行提交或回滚
这时就需要有个定时巡查机制来检查这些没有正常收到提交状态的消息的实际状态到底是什么这个巡查机制就是消息回查也称为事务消息的补偿。在事务监听器TransactionListener中就是通过checkLocalTransaction方法来实现executeLocalTransaction方法返回值一共有3种状态
COMMIT_MESSAGE 提交状态事务正常进行一般是本地事务执行成功后进行设置。告知broker提交该事务消息然后消费者可以消费该消息当然此时消费者已经执行完本地事务了再消费可以根据业务逻辑进行后续的逻辑处理如果没有相关逻辑了忽略消息即可 ROLLBACK_MESSAGE 回滚状态事务撤回broker将删除当前half消息一般是本地事务执行失败后进行设置 UNKNOW 未知状态固定时间后Broker端会通过checkLocalTransaction方法进行消息回查根据回查结果来判断该消息是提交还是回滚 示例代码
Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// TODO 执行本地事务书写你自己的本地事务逻辑try{String body new String(msg.getBody());int i Integer.parseInt(body);// 模拟偶数执行成功奇数执行失败if(i % 2 0){System.out.println(本地事务执行成功body);// 执行成功return LocalTransactionState.COMMIT_MESSAGE;}else{System.out.println(本地事务执行失败body);// 执行失败return LocalTransactionState.ROLLBACK_MESSAGE;}}catch (Exception e){e.printStackTrace();// 执行失败return LocalTransactionState.ROLLBACK_MESSAGE;}}
因此checkLocalTransaction方法中就要书写检查本地事务状态的方法比如事务是对订单提交消息的消费那么就去查询订单状态如果已经是提交状态那么就返回COMMIT_MESSAGE否则就返回ROLLBACK_MESSAGE
示例代码 Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// TODO 去缓存或者数据库查询当前消息的实际状态// 模拟查询到状态为1Integer status 1;// 不同实际状态对应的消息状态if (null ! status) {switch (status) {case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}2. 事务消息的实现
2.1 java client实现适用于spring框架
参考之前文章https://wu55555.blog.csdn.net/article/details/138338692
2.2 springboot实现
参考之前文章https://wu55555.blog.csdn.net/article/details/139741449
3. 总结
最后一句话总结一下rocketmq的事务消息是通过half消息即二阶段提交回查机制来实现的。