公司建设网站的费用,慈溪开发小学网站建设,山东省建设厅教育网站,wordpress 文章 路径Seata源码分析-2PC核心源码解读
2PC提交源码流程
上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器#xff0c;一旦执行拦截器#xff0c;我们就会进入到其中的invoke方法#xff0c;在这其中会做一些GlobalTransactional注解的判断#xff0c;如果有注解…Seata源码分析-2PC核心源码解读
2PC提交源码流程
上节课我们分析到了GlobalTransactionalInterceptor全局事务拦截器一旦执行拦截器我们就会进入到其中的invoke方法在这其中会做一些GlobalTransactional注解的判断如果有注解以后会执行全局事务和全局锁那么在执行全局事务的时候会调用handleGlobalTransaction全局事务处理器这里主要是获取事务信息
Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed true;try {return transactionalTemplate.execute(new TransactionalExecutor() {Overridepublic Object execute() throws Throwable {return methodInvocation.proceed();}// 获取事务名称默认获取方法名public String name() {String name globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}/*** 解析GlobalTransactional注解属性封装为对象* return*/Overridepublic TransactionInfo getTransactionInfo() {// reset the value of timeout// 获取超时时间默认60秒int timeout globalTrxAnno.timeoutMills();if (timeout 0 || timeout DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {timeout defaultGlobalTransactionTimeout;}// 构建事务信息对象TransactionInfo transactionInfo new TransactionInfo();transactionInfo.setTimeOut(timeout);// 超时时间transactionInfo.setName(name()); // 事务名称transactionInfo.setPropagation(globalTrxAnno.propagation());// 事务传播transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());// 校验或占用全局锁重试间隔transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());// 校验或占用全局锁重试次数SetRollbackRule rollbackRules new LinkedHashSet();// 其他构建信息for (Class? rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class? rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {// 执行异常TransactionalExecutor.Code code e.getCode();switch (code) {case RollbackDone:throw e.getOriginalException();case BeginFailure:succeed false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed false;failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure:failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying:failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default:throw new ShouldNeverHappenException(String.format(Unknown TransactionalExecutor.Code: %s, code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}
}在这其中我们要关注一个重点方法execute()
其实这个方法主要的作用就是执行事务的流程大概一下几点
获取事务信息开始执行全局事务发生异常全局回滚各个数据通过undo_log表进行事务补偿全局事务提交清除所有资源
这个位置是非常核心的一个位置因为我们所有的业务进来以后都会走这个位置。
这其中的第三步和第四步就是在想TCSeata-Server发起全局事务的提交/回滚
public Object execute(TransactionalExecutor business) throws Throwable {// 1. Get transactionInfo// 获取事务信息TransactionInfo txInfo business.getTransactionInfo();if (txInfo null) {throw new ShouldNeverHappenException(transactionInfo does not exist);}// 1.1 Get current transaction, if not null, the tx role is GlobalTransactionRole.Participant.// 获取当前事务主要获取XidGlobalTransaction tx GlobalTransactionContext.getCurrent();// 1.2 Handle the transaction propagation.// 根据配置的不同事务传播行为执行不同的逻辑Propagation propagation txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder null;try {switch (propagation) {case NOT_SUPPORTED:// If transaction is existing, suspend it.if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend();}// Execute without transaction and return.return business.execute();case REQUIRES_NEW:// If transaction is existing, suspend it, and then begin new transaction.if (existingTransaction(tx)) {suspendedResourcesHolder tx.suspend();tx GlobalTransactionContext.createNew();}// Continue and execute with new transactionbreak;case SUPPORTS:// If transaction is not existing, execute without transaction.if (notExistingTransaction(tx)) {return business.execute();}// Continue and execute with new transactionbreak;case REQUIRED:// If current transaction is existing, execute with current transaction,// else continue and execute with new transaction.break;case NEVER:// If transaction is existing, throw exception.if (existingTransaction(tx)) {throw new TransactionException(String.format(Existing transaction found for transaction marked with propagation never, xid %s, tx.getXid()));} else {// Execute without transaction and return.return business.execute();}case MANDATORY:// If transaction is not existing, throw exception.if (notExistingTransaction(tx)) {throw new TransactionException(No existing transaction found for transaction marked with propagation mandatory);}// Continue and execute with current transaction.break;default:throw new TransactionException(Not Supported Propagation: propagation);}// 1.3 If null, create new transaction with role GlobalTransactionRole.Launcher.// 当前没有事务则创建一个新的事务if (tx null) {tx GlobalTransactionContext.createNew();}// set current tx config to holderGlobalLockConfig previousConfig replaceGlobalLockConfig(txInfo);try {// 2. If the tx role is GlobalTransactionRole.Launcher, send the request of beginTransaction to TC,// else do nothing. Of course, the hooks will still be triggered.// 开始执行全局事务beginTransaction(txInfo, tx);Object rs;try {// Do Your Business// 执行当前业务逻辑// 1. 在TC注册当前分支事务TC会在branch_table中插入一条分支事务数据// 2. 执行本地update语句并在执行前后查询数据状态并把数据前后镜像存入到undo_log表中// 3. 远程调用其他应用远程应用接收到xid也会注册分支事务写入branch_table及本地undo_log表// 4. 会在lock_table表中插入全局锁数据一个分支一条rs business.execute();} catch (Throwable ex) {// 3. The needed business exception to rollback.// 发生异常全局回滚各个数据通过undo_log表进行事务补偿completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4. everything is fine, commit.// 全局提交事务commitTransaction(tx);return rs;} finally {//5. clear// 清除所有资源resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {// If the transaction is suspended, resume it.if (suspendedResourcesHolder ! null) {tx.resume(suspendedResourcesHolder);}}
}如何发起全局事务
这个位置我们就看当前这个代码中的 beginTransaction(txInfo, tx);方法
// 想TC发起请求这里采用了模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeBegin();// 对TC发起请求tx.begin(txInfo.getTimeOut(), txInfo.getName());triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}
} 那我们向下来看begin方法那要注意这里调用begin方法的是DefaultGlobalTransaction
Override
public void begin(int timeout, String name) throws TransactionException {//判断调用者是否是TMif (role ! GlobalTransactionRole.Launcher) {assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug(Ignore Begin(): just involved in global transaction [{}], xid);}return;}assertXIDNull();String currentXid RootContext.getXID();if (currentXid ! null) {throw new IllegalStateException(Global transaction already exists, cant begin a new global transaction, currentXid currentXid);}// 获取Xidxid transactionManager.begin(null, null, name, timeout);status GlobalStatus.Begin;RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info(Begin new global transaction [{}], xid);}
}在向下来看begin方法这时候使用的是(默认事务管理者)DefaultTransactionManager.begin来真正的获取xid其中就是传入事务的相关信息最终TC端返回对应的全局事务Xid。
Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {GlobalBeginRequest request new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);// 发送请求得到响应GlobalBeginResponse response (GlobalBeginResponse) syncCall(request);if (response.getResultCode() ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//返回Xidreturn response.getXid();
}这里采用的是Netty的通讯方式
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {// 通过Netty发送请求return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, RPC timeout, toe);}
}图解地址https://www.processon.com/view/link/6213d58f1e0853078013c58f