广州微网站建设哪家好,免费门户网站开发,深圳一百讯网站建设,网站权重一般有几个等级文章目录一、前言二、Seata saga模式介绍1、示例状态图2、“状态机”介绍1#xff09;“状态机”属性2#xff09;“状态”属性3#xff09;更多状态相关内容三、SpringCloud 集成 seata saga1、saga模式状态机相关信息1#xff09;状态机配置相关的三个表2#xff09;状态…
文章目录一、前言二、Seata saga模式介绍1、示例状态图2、“状态机”介绍1“状态机”属性2“状态”属性3更多状态相关内容三、SpringCloud 集成 seata saga1、saga模式状态机相关信息1状态机配置相关的三个表2状态图2、项目代码0pom.xml1线程池配置 -- MyThreadFactory2seata saga相关配置 -- SagaConfiguration3库存服务 -- InventoryServiceInventoryServiceImpl4账户余额服务 -- BalanceServiceBalanceServiceImpl5启动类 -- SagaTradeApplication6 状态图对应的JSON文件 -- reduce_inventory_and_balance.json状态图流程解析7application.yml8file.conf9开启状态机入口 -- TradeController3、测试 / 验证1启动seata-server服务2启动seata-clientsaga-trade3事务提交4事务回滚三、总结一、前言
更多内容见Seata专栏https://blog.csdn.net/saintmm/category_11953405.html
至此seata系列的内容已出 can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版看完本文必解决问题Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版超细的Spring Cloud 整合Seata实现分布式事务排坑版Spring Cloud整合Seata、Nacos实现分布式事务案例巨细排坑版【云原生】分布式事务Seata源码解析一在IDEA中启动Seata Server分布式事务Seata源码解析二Seata Server启动时都做了什么分布式事务Seata源码解析三从Spring Boot特性来看Seata Client 启动时都做了什么分布式事务Seata源码解析四图解Seata Client 如何与Seata Server建立连接、通信分布式事务Seata源码解析五GlobalTransactional如何开启全局事务分布式事务Seata源码解析六全局/分支事务分布式ID如何生成序列号超了怎么办时钟回拨问题如何处理分布式事务Seata源码解析七图解Seata事务执行流程之开启全局事务分布式事务Seata源码解析八本地事务执行流程(AT模式下)分布式事务Seata源码解析九分支事务如何注册到全局事务分布式事务Seata源码解析十AT模式回滚日志undo log详细构建过程分布式事务Seata源码解析11全局事务执行流程之两阶段全局事务提交分布式事务Seata源码解析12全局事务执行流程之全局事务回滚Spring Cloud整合Seata实现TCC分布式事务模式案例分布式事务Seata源码解析13TCC事务模式实现原理分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案seata1.5.1如何解决Seata XA模式概述案例saga模式、Seata saga模式详介 至此Seata常用的AT模式、TCC模式 和 XA模式已完结SAGA模式也已做了基本介绍本文接着聊Spring Cloud 如何集成Seata saga模式实现全局事务、分支事务
二、Seata saga模式介绍
官方文档地址https://seata.io/zh-cn/docs/user/saga.html
Seata提供的Saga模式目前只能通过状态机引擎来实现整体机制为
通过状态图来定义服务调用的流程并生成 json 状态语言定义文件 换言之需要开发者手工的进行Saga业务流程绘制并将其转换为JSON配置文件 状态图中一个节点可以是调用一个服务节点可以配置它的补偿节点 注意: 异常发生时是否进行补偿也可由用户自定义决定可以选择不配置 状态图 json 由状态机引擎驱动执行当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚 在程序启动时会根据saga状态图加载业务处理流程包括服务补偿处理 可以实现服务编排需求支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
1、示例状态图 2、“状态机”介绍
seata saga的状态语言在一定程度上参考了AWS Step Functions
1“状态机”属性
Name: 表示状态机的名称必须唯一Comment: 状态机的描述Version: 状态机定义版本StartState: 启动时运行的第一个状态States: 状态列表是一个map结构key是状态的名称在状态机内必须唯一IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新
2“状态”属性
Type: “状态” 的类型比如有: ServiceTask: 执行调用服务任务Choice: 单条件选择路由CompensationTrigger: 触发补偿流程Succeed: 状态机正常结束Fail: 状态机异常结束SubStateMachine: 调用子状态机CompensateSubMachine: 用于补偿一个子状态机 ServiceName: 服务名称通常是服务的beanId也就是Spring容器中的beanName 无论是SpringCloud还是Dubbo、HSF…最重要的就是配置这个beanId。 ServiceMethod: 服务方法名称也就是Spring Bean中的某个方法名CompensateState: 该状态的补偿状态Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数表达使用 SpringEL, 如果是常量直接写值即可Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构key为放入到状态机上文时的key状态机上下文也是一个mapvalue中$.是表示SpringEL表达式表示从服务的返回参数中取值#root表示服务的整个返回参数Status: 服务执行状态映射框架定义了三个状态SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态帮助框架判断整个事务的一致性是一个map结构key是条件表达式一般是取服务的返回值或抛出的异常进行判断默认是SpringEL表达式判断服务返回参数带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值Catch: 捕获到异常后的路由Next: 服务执行完成后下一个执行的状态Choices: Choice类型的状态里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个状态ErrorCode: Fail类型状态的错误码Message: Fail类型状态的错误信息
3更多状态相关内容
更多详细的状态语言使用示例见github https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine
三、SpringCloud 集成 seata saga
官方提供的saga案例地址https://github.com/seata/seata-samples/tree/master/saga 然而并没有提供SpringCloud与saga模式集成的案例以下介绍SpringCloud与saga模式集成的案例。
1、saga模式状态机相关信息
1状态机配置相关的三个表
首先我们需要 在使用状态机开启saga分支事务的 服务对应的数据库连接中创建三个表以MYSQL为例
CREATE TABLE IF NOT EXISTS seata_state_machine_def
(id VARCHAR(32) NOT NULL COMMENT id,name VARCHAR(128) NOT NULL COMMENT name,tenant_id VARCHAR(32) NOT NULL COMMENT tenant id,app_name VARCHAR(32) NOT NULL COMMENT application name,type VARCHAR(20) COMMENT state language type,comment_ VARCHAR(255) COMMENT comment,ver VARCHAR(16) NOT NULL COMMENT version,gmt_create DATETIME(3) NOT NULL COMMENT create time,status VARCHAR(2) NOT NULL COMMENT status(AC:active|IN:inactive),content TEXT COMMENT content,recover_strategy VARCHAR(16) COMMENT transaction recover strategy(compensate|retry),PRIMARY KEY (id)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS seata_state_machine_inst
(id VARCHAR(128) NOT NULL COMMENT id,machine_id VARCHAR(32) NOT NULL COMMENT state machine definition id,tenant_id VARCHAR(32) NOT NULL COMMENT tenant id,parent_id VARCHAR(128) COMMENT parent id,gmt_started DATETIME(3) NOT NULL COMMENT start time,business_key VARCHAR(48) COMMENT business key,start_params TEXT COMMENT start parameters,gmt_end DATETIME(3) COMMENT end time,excep BLOB COMMENT exception,end_params TEXT COMMENT end parameters,status VARCHAR(2) COMMENT status(SU succeed|FA failed|UN unknown|SK skipped|RU running),compensation_status VARCHAR(2) COMMENT compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running),is_running TINYINT(1) COMMENT is running(0 no|1 yes),gmt_updated DATETIME(3) NOT NULL,PRIMARY KEY (id),UNIQUE KEY unikey_buz_tenant (business_key, tenant_id)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;CREATE TABLE IF NOT EXISTS seata_state_inst
(id VARCHAR(48) NOT NULL COMMENT id,machine_inst_id VARCHAR(128) NOT NULL COMMENT state machine instance id,name VARCHAR(128) NOT NULL COMMENT state name,type VARCHAR(20) COMMENT state type,service_name VARCHAR(128) COMMENT service name,service_method VARCHAR(128) COMMENT method name,service_type VARCHAR(16) COMMENT service type,business_key VARCHAR(48) COMMENT business key,state_id_compensated_for VARCHAR(50) COMMENT state compensated for,state_id_retried_for VARCHAR(50) COMMENT state retried for,gmt_started DATETIME(3) NOT NULL COMMENT start time,is_for_update TINYINT(1) COMMENT is service for update,input_params TEXT COMMENT input parameters,output_params TEXT COMMENT output parameters,status VARCHAR(2) NOT NULL COMMENT status(SU succeed|FA failed|UN unknown|SK skipped|RU running),excep BLOB COMMENT exception,gmt_updated DATETIME(3) COMMENT update time,gmt_end DATETIME(3) COMMENT end time,PRIMARY KEY (id, machine_inst_id)
) ENGINE InnoDBDEFAULT CHARSET utf8mb4;数据库表的出处见seata官方地址https://github.com/seata/seata/blob/1.5.2/script/client/saga/db/mysql.sql 2状态图
状态机设计器演示在线画图工具地址http://seata.io/saga_designer/index.html 2、项目代码
整体代码结构 此处案例和saga官方提供的一样仅示范saga模式的使用不涉及RPC、业务表操作若读者想丰富案例可在笔者的todo标注处自行添加。
0pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.12.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentmodelVersion4.0.0/modelVersionversion0.0.1-SNAPSHOT/versiongroupIdcom.saint/groupIdartifactIdsaga-trade/artifactIdpropertiesjava.version1.8/java.versiondruid.version1.2.8/druid.versionmysql.version8.0.22/mysql.version!--seata1.5.2 版本源码验证--spring-boot.version2.3.12.RELEASE/spring-boot.versionspring-cloud.versionHoxton.SR12/spring-cloud.versionspring-cloud-alibaba.version2.2.9.RELEASE/spring-cloud-alibaba.versiondruid.version1.2.8/druid.versionmysql.version8.0.22/mysql.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdscoperuntime/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-jpa/artifactId/dependencydependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactId/dependencydependencygroupIdorg.apache.commons/groupIdartifactIdcommons-lang3/artifactIdversion3.12.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.10/version/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion${spring-boot.version}/versiontypepom/typescopeimport/scope/dependency!--整合spring cloud--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion${spring-cloud.version}/versiontypepom/typescopeimport/scope/dependency!--整合spring cloud alibaba--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-alibaba-dependencies/artifactIdversion${spring-cloud-alibaba.version}/versiontypepom/typescopeimport/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactIdversion${druid.version}/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion${mysql.version}/version/dependency/dependencies/dependencyManagementbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring-boot.version}/versionconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build
/project1线程池配置 – MyThreadFactory
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定义线程工厂*/
public class MyThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber;private ThreadGroup group;private String namePrefix;public MyThreadFactory(String namePrefix) {this.threadNumber new AtomicInteger(1);SecurityManager s System.getSecurityManager();this.group s ! null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix namePrefix _THREAD_;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(this.group, r, this.namePrefix this.threadNumber.getAndIncrement(), 0L);return t;}
}2seata saga相关配置 – SagaConfiguration
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine;
import io.seata.saga.rm.StateMachineEngineHolder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** author Saint*/
Configuration
public class SagaConfiguration {BeanConfigurationProperties(prefix spring.datasource)public DataSource dataSource() {return new DruidDataSource();}Beanpublic ThreadPoolExecutor sagaThreadPool() {ThreadPoolExecutor executor new ThreadPoolExecutor(1,20,30, TimeUnit.SECONDS,new LinkedBlockingQueue(2000),new MyThreadFactory(SAGA_ASYNC_EXE_),new ThreadPoolExecutor.AbortPolicy());return executor;}Beanpublic DbStateMachineConfig dbStateMachineConfig() {DbStateMachineConfig config new DbStateMachineConfig();config.setDataSource(dataSource());config.setResources(new String[]{statelang/*.json});config.setEnableAsync(true);config.setApplicationId(saga-trade);config.setTxServiceGroup(saint-trade-tx-group);config.setThreadPoolExecutor(sagaThreadPool());return config;}Beanpublic ProcessCtrlStateMachineEngine stateMachineEngine() {ProcessCtrlStateMachineEngine engine new ProcessCtrlStateMachineEngine();engine.setStateMachineConfig(dbStateMachineConfig());return engine;}Beanpublic StateMachineEngineHolder stateMachineEngineHolder() {StateMachineEngineHolder holder new StateMachineEngineHolder();holder.setStateMachineEngine(stateMachineEngine());return holder;}
}3库存服务 – InventoryService
InventoryService提供了两个方法一个reduce()、一个reduce()对应的补偿方法compensateReduce()
package com.saint.saga.trade.service;/*** Inventory Actions*/
public interface InventoryService {/*** reduce** param businessKey 业务上的唯一标识* param count* return*/boolean reduce(String businessKey, int count);/*** increase** param businessKey 业务上的唯一标识* return*/boolean compensateReduce(String businessKey);
}InventoryServiceImpl
package com.saint.saga.trade.service.impl;import com.saint.saga.trade.service.InventoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;/*** 库存** author Saint*/
Service(value inventoryService)
public class InventoryServiceImpl implements InventoryService {private static final Logger LOGGER LoggerFactory.getLogger(InventoryActionImpl.class);Overridepublic boolean reduce(String businessKey, int count) {LOGGER.info(reduce inventory succeed, count: count , businessKey: businessKey);// todo rpc / httpreturn true;}Overridepublic boolean compensateReduce(String businessKey) {LOGGER.info(compensate reduce inventory succeed, businessKey: businessKey);// todo rpc / httpreturn true;}
}4账户余额服务 – BalanceService
BalanceService提供了两个方法一个reduce()、一个reduce()对应的补偿方法compensateReduce()
package com.saint.saga.trade.service;import java.math.BigDecimal;
import java.util.Map;/*** Balance Actions*/
public interface BalanceService {/*** reduce** param businessKey 业务上的唯一标识* param amount* param params* return*/boolean reduce(String businessKey, BigDecimal amount, MapString, Object params);/*** compensateReduce** param businessKey 业务上的唯一标识* param params* return*/boolean compensateReduce(String businessKey, MapString, Object params);}BalanceServiceImpl
package com.saint.saga.trade.service.impl;import com.saint.saga.trade.service.BalanceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;import java.math.BigDecimal;
import java.util.Map;/*** 账户余额** author Saint*/
Service(value balanceService)
public class BalanceServiceImpl implements BalanceService {private static final Logger LOGGER LoggerFactory.getLogger(BalanceServiceImpl.class);Overridepublic boolean reduce(String businessKey, BigDecimal amount, MapString, Object params) {if (params ! null) {Object throwException params.get(throwException);if (throwException ! null true.equals(throwException.toString())) {throw new RuntimeException(reduce balance failed);}}LOGGER.info(reduce balance succeed, amount: amount , bizCode: businessKey);// todo rpc / httpreturn true;}Overridepublic boolean compensateReduce(String businessKey, MapString, Object params) {if (params ! null) {Object throwException params.get(throwException);if (throwException ! null true.equals(throwException.toString())) {throw new RuntimeException(compensate reduce balance failed);}}LOGGER.info(compensate reduce balance succeed, businessKey: businessKey);// todo rpc / httpreturn true;}
}5启动类 – SagaTradeApplication
SpringBootApplication
public class SagaTradeApplication {public static void main(String[] args) {ConfigurableApplicationContext run SpringApplication.run(SagaTradeApplication.class, args);InventoryService bean run.getBean(InventoryService.class);BalanceService bean1 run.getBean(BalanceService.class);}
}6 状态图对应的JSON文件 – reduce_inventory_and_balance.json
{Name: reduceInventoryAndBalance,Comment: reduce inventory then reduce balance in a transaction,StartState: ReduceInventory,Version: 0.0.1,States: {ReduceInventory: {Type: ServiceTask,ServiceName: inventoryService,ServiceMethod: reduce,CompensateState: CompensateReduceInventory,Next: ChoiceState,Input: [$.[businessKey],$.[count]],Output: {reduceInventoryResult: $.#root},Status: {#root true: SU,#root false: FA,$Exception{java.lang.Throwable}: UN}},ChoiceState: {Type: Choice,Choices: [{Expression: [reduceInventoryResult] true,Next: ReduceBalance}],Default: Fail},ReduceBalance: {Type: ServiceTask,ServiceName: balanceService,ServiceMethod: reduce,CompensateState: CompensateReduceBalance,Input: [$.[businessKey],$.[amount],{throwException: $.[mockReduceBalanceFail]}],Output: {compensateReduceBalanceResult: $.#root},Status: {#root true: SU,#root false: FA,$Exception{java.lang.Throwable}: UN},Catch: [{Exceptions: [java.lang.Throwable],Next: CompensationTrigger}],Next: Succeed},CompensateReduceInventory: {Type: ServiceTask,ServiceName: inventoryService,ServiceMethod: compensateReduce,Input: [$.[businessKey]]},CompensateReduceBalance: {Type: ServiceTask,ServiceName: balanceService,ServiceMethod: compensateReduce,Input: [$.[businessKey]]},CompensationTrigger: {Type: CompensationTrigger,Next: Fail},Succeed: {Type: Succeed},Fail: {Type: Fail,ErrorCode: PURCHASE_FAILED,Message: purchase failed}}
}状态图流程解析 7application.yml
server:port: 9099
spring:application:name: saga-tradedatasource:url: jdbc:mysql://127.0.0.1:3306/seata_saga?useUnicodetruecharacterEncodingutf8allowMultiQueriestrueuseSSLfalseusername: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driverjpa:show-sql: trueseata:tx-service-group: saint-trade-tx-group8file.conf
transport {# tcp udt unix-domain-sockettype TCP#NIO NATIVEserver NIO#enable heartbeatheartbeat true# the client batch send request enableenableClientBatchSendRequest true#thread factory for nettythreadFactory {bossThreadPrefix NettyBossworkerThreadPrefix NettyServerNIOWorkerserverExecutorThread-prefix NettyServerBizHandlershareBossWorker falseclientSelectorThreadPrefix NettyClientSelectorclientSelectorThreadSize 1clientWorkerThreadPrefix NettyClientWorkerThread# netty boss thread size,will not be used for UDTbossThreadSize 1#auto default pin or 8workerThreadSize default}shutdown {# when destroy server, wait secondswait 3}serialization seatacompressor none
}
service {#transaction service group mappingvgroupMapping.saint-trade-tx-group seata-server-sh#only support when registry.typefile, please dont set multiple addressesseata-server-sh.grouplist 127.0.0.1:8091#degrade, current not supportenableDegrade false#disable seatadisableGlobalTransaction false
}client {rm {asyncCommitBufferLimit 10000lock {retryInterval 10retryTimes 30retryPolicyBranchRollbackOnConflict true}reportRetryCount 5tableMetaCheckEnable falsereportSuccessEnable false}tm {commitRetryCount 5rollbackRetryCount 5}undo {dataValidation truelogSerialization jacksonlogTable undo_log}log {exceptionRate 100}
}9开启状态机入口 – TradeController
状态机支持两种执行方式同步执行、异步执行 同步执行APIStateMachineEngine#startWithBusinessKey();异步执行APIStateMachineEngine#startWithBusinessKeyAsync(…, AsyncCallback) 其中的AsyncCallback为异步执行结束之后的回调函数。
package com.saint.saga.trade.controller;import io.seata.saga.engine.AsyncCallback;
import io.seata.saga.engine.StateMachineEngine;
import io.seata.saga.proctrl.ProcessContext;
import io.seata.saga.statelang.domain.ExecutionStatus;
import io.seata.saga.statelang.domain.StateMachineInstance;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;/*** author Saint*/
RestController
RequestMapping(saga)
Slf4j
public class TradeController {Autowiredprivate StateMachineEngine stateMachineEngine;/*** POST请求 http://localhost:9099/saga/commit?amount50count2*/RequestMapping(value /commit, method RequestMethod.POST)public String commit(Integer amount, Integer count) {String businessKey String.valueOf(System.currentTimeMillis());MapString, Object startParams generateStartParams(amount, count, false);// 1、sync testStateMachineInstance instance stateMachineEngine.startWithBusinessKey(reduceInventoryAndBalance, null,businessKey, startParams);// 2、async test
// StateMachineInstance instance stateMachineEngine.startWithBusinessKeyAsync(reduceInventoryAndBalance, null, businessKey, startParams,
// CALL_BACK);
// waitingForFinish(instance);// PS: instance is not nullif (!ExecutionStatus.SU.equals(instance.getStatus())) {log.error(saga transaction execute failed. XID: {}, instance.getId());return rollback;}log.info(saga transaction commit succeed. XID: {}, instance.getId());return succeed;}/*** POST请求 http://localhost:9099/saga/rollback?amount50count2*/RequestMapping(value /rollback, method RequestMethod.POST)public String rollback(Integer amount, Integer count) {String businessKey String.valueOf(System.currentTimeMillis());// unique difference is hereMapString, Object startParams generateStartParams(amount, count, true);// 1、sync testStateMachineInstance instance stateMachineEngine.startWithBusinessKey(reduceInventoryAndBalance, null,businessKey, startParams);// 2、async test
// StateMachineInstance instance stateMachineEngine.startWithBusinessKeyAsync(reduceInventoryAndBalance, null, businessKey, startParams,
// CALL_BACK);
// waitingForFinish(instance);// PS: instance is not nullif (!ExecutionStatus.SU.equals(instance.getStatus())) {log.error(saga transaction execute failed. XID: {}, instance.getId());return rollback;}log.info(saga transaction commit succeed. XID: {}, instance.getId());return succeed;}/*** parameters to be used in the state machine状态机需要用到的参数在这里组装*/private MapString, Object generateStartParams(Integer amount, Integer count, Boolean mockFail) {String businessKey String.valueOf(System.currentTimeMillis());MapString, Object startParams new HashMap(8);startParams.put(businessKey, businessKey);startParams.put(count, 10);startParams.put(amount, new BigDecimal(String.valueOf(amount)));if (mockFail)startParams.put(mockReduceBalanceFail, true);return startParams;}private static volatile Object lock new Object();private static AsyncCallback CALL_BACK new AsyncCallback() {Overridepublic void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {synchronized (lock) {lock.notifyAll();}}Overridepublic void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {synchronized (lock) {lock.notifyAll();}}};private static void waitingForFinish(StateMachineInstance inst) {synchronized (lock) {if (!ExecutionStatus.RU.equals(inst.getStatus()))return;try {lock.wait();} catch (InterruptedException e) {log.error(occur exception, , e);}}}
}3、测试 / 验证
1启动seata-server服务
参考博文超细的Spring Cloud 整合Seata实现分布式事务排坑版进行seata-server的配置和启动 注意本文使用的seata版本是1.5.2切勿使用成参考博文中的1.3.0。 seata server1.5.2启动成功后控制台输出 2启动seata-clientsaga-trade 3事务提交
执行 POST类型请求http://localhost:9099/saga/commit?amount50count2 saga-trade控制台输出 seata-server日志 4事务回滚
执行 POST类型请求http://localhost:9099/saga/commit?amount50count2 saga-trade控制台输出 seata-server日志 三、总结
seata的saga模式适用于长流程 或 长事务场景。saga模式复杂的地方在于引入状态机需要自己根据业务定义状态机的流程然后把定义好的流程用json文件导入到工程中。
此外saga模式需要开发者自定义回滚事件并要考虑空补偿、悬挂、幂等三种问题即允许空补偿、做防悬挂控制、做幂等控制。读者可以参考TCC模式中的解决方案分布式事务Seata TCC空回滚/幂等/悬挂问题、解决方案seata1.5.1如何解决实现