网站副标题的作用,公司网页网站建设ppt模板,营销推广活动策划方案,网络培训的心得体会Rocketmq 事务消息API使用
使用TransactionMQProducer类。 实现TransactionListener 接口覆盖其方法executeLocalTransaction和checkLocalTransaction 即可。
其中executeLocalTransaction 执行本地方法和checkLocalTransaction 事务状态回查。 玩法
简历一张本地事务表字段大概有rocketmq事务id业务事务id于executeLoalTransaction利用数据库事务特性和业务数据同时持久化到数据库。checkLocalTransaction. 按rocketmq事务id查询数据库是否有对应的数据。
为什么需要本地事务表
保证可靠性。当业务事务提交后节点宕机。rocketmq同样也能回查到数据。 流程分析 事务消息源码分析
实现原理是基于二阶段提交和定时事务状态回查实现的。
二阶段提交分析
涉及相关类
Producer
TransactionMQProducer
DefaultMQProducerImpl
TransactionListener
Broker
SendMessageProcessor
EndTransactionProcessor
分析流程
入口方法TransactionMQProducer.sendMessageInTransaction 投递事务消息调用DefaultMQProducerImpl.sendMessageInTransaction为消息头部增加事务消息标志发送消息。Broker 入口方法 SendMessageProcessor#sendMessage检查消息头部是否有事务标记有投递半消息。响应Producer 结果包括事务idProducer收到消息成功发送结果后执行本地事务。并通知Broker 本地事务执行结果。Broker 入口方法EndTransactionProcessor#processRequest 。按收到结果做决定。若是事务提交则投递普通消息删除半消息。若是事务回滚则删除半消息。
事务消息回查
RocketMQ 通过TramsactionalMessageCheckService 线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息回查消息的事务状态。TransactionalMessageCheckService 的检测频率默认为1分钟可通过broker.conf文件中设置transactionCheckInterval 来改变默认值单位为毫秒
public class TransactionalMessageCheckService extends ServiceThread {private static final Logger log LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);private BrokerController brokerController;public TransactionalMessageCheckService(BrokerController brokerController) {this.brokerController brokerController;}//.... 省略代码Overrideprotected void onWaitEnd() {long timeout brokerController.getBrokerConfig().getTransactionTimeOut();int checkMax brokerController.getBrokerConfig().getTransactionCheckMax();long begin System.currentTimeMillis();log.info(Begin to check prepare message, begin time:{}, begin);this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());log.info(End to check prepare message, consumed time:{}, System.currentTimeMillis() - begin);}}