前言 本文要求对Springcloud有一定了解,对分布式事务本身有一定认知,如果不了解微服务,建议先看看Spring Cloud的基本认识和使用Spring Cloud的基本教程 ,再回头学习本文
为什么会出现分布式事务 开发过程中,随着项目模块的增加以及分库分表的出现,传统事务已经无法满足业务需求,如分库,由于有多个数据源,而数据库事务又是基于数据库层,所以如果只用数据库原生事务,会导致数据库A成功提交,数据库B回滚,导致数据不一致,又比如多模块下,常见的订单流程,订单服务成功提交订单,调用库存服务扣减库存,由于是链式调用,库存成功扣减,然后回到订单服务时,出现异常,导致订单回滚,但是此时库存却未回滚,也会导致数据不一致,所以这些情况都需要分布式事务来解决这个问题(当然一般开发中,我们常用的做法是能避免就尽量避免,实在避免不了才使用分布式事务,因为分布式事务不管怎么样,性能,一致性,原子性等都会收到影响)
分布式事务目前的几种方案 2PC(二阶段提交)
3PC(三阶段提交)
TCC(Try - Confirm - Cancel)
最终一致性(消息队列等方式)
最大努力通知(数据不要求强一致)
每种方案都各有优劣,具体采用何种方案还需要根据实际业务场景来使用
初识Seata Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。详情可以去查看seata官方文档
初始Nacos Nacos 是一款阿里巴巴开源的服务注册中心,配置中心,管理中心,以下是nacos官网简介:
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
我们可以把nacos当作Eureka +Srpingcloud config的结合体,详情可以查去查看nacos官方文档
下载并部署Nacos Nacos跟Eureka 不一样,Eureka是创建一个Eureka 的项目即可,但是Nacos需要专门下载Nacos的服务端,可以从下载地址 去下载最新的稳定版,目前编写本文时最新版为2.0.0-ALPHA.2,下载好之后解压,进入bin目录,并根据系统执行相关的脚本启动服务,记得脚本后面加上参数
standalone代表着单机模式运行,非集群模式,由于本文只讲解单机模式,所以不用集群模式,集群模式还要配置其他东西,启动成功后浏览器访问http://localhost:8848/nacos ,能看到登录界面,输入默认账号密码nacos(账号密码都是nacos),能进入首页,如下图
下载并部署Seata 下载并配置
从 https://github.com/seata/seata/releases ,下载服务器软件包,将其解压缩,本文编写时最新版为1.4.0,进入conf目录,编辑registry.conf,配置Seata的registry和config为nacos模式,同时配置nacos相关配置,首先修改registry(注册中心)节点下的type值为nacos,然后配置register节点下的nacos节点。
旧配置:
1 2 3 4 5 6 7 8 9 nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "" cluster = "default" username = "" password = "" }
新配置
1 2 3 4 5 6 7 8 9 nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "public" cluster = "default" username = "nacos" password = "nacos" }
application 为Seata-server注册到nacos的应用名称,serverAddr 为nacos的地址,group为注册到nacos的应用分组,要使用seata,必须要seata-server跟其他使用使用seata分布式事务的应用在同一分组才行,namespace 为命名空间,我们使用默认的public,当然也可以创建其他命名空间来使用,username和password对应的事nacos的账号密码
修改config(配置中心)节点下type为nacos,同时修改config节点下的nacos节点
修改前配置
1 2 3 4 5 6 7 nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" }
修改后
1 2 3 4 5 6 7 nacos { serverAddr = "127.0.0.1:8848" namespace = "public" group = "SEATA_GROUP" username = "nacos" password = "nacos" }
添加Seata配置 进入nacos里面,添加一个事务分组的配置(常规来说要添加很多配置,不过只要添加了事务分组配置,就可以满足基本运行要求)
点击添加按钮新增一个配置
添加如下配置:
其中配置名称
1 service.vgroupMapping.my_test_tx_group
中的my_test_tx_group是我们自定义的名称,到时候需要配置项目中,可以根据实际情况配置,Group一定要跟之前Seata中配置的group对应上,内容default要跟之前Seata中的register节点下的cluster对应上,点击发布保存配置
完整Seata配置(此步骤非必须) 以上配置只是实现分布式事务的最基本的配置,如果想要配置所有支持的参数配置,可以从github下载 配置文件config.txt,或者创建config.txt文件并复制写入下面代码(下面代码不是实时更新,所以最好还是去github下载)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 service.vgroupMapping.my_test_tx_group=default service.default.grouplist=127.0.0.1:8091 service.enableDegrade=false service.disableGlobalTransaction=false client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 store.mode=file store.publicKey= store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=username store.db.password=password store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 store.redis.mode=single store.redis.single.host=127.0.0.1 store.redis.single.port=6379 store.redis.maxConn=10 store.redis.minConn=1 store.redis.maxTotal=100 store.redis.database=0 store.redis.password= store.redis.queryLimit=100 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k log.exceptionRate=100 transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
以上配置含义建议百度或者查看官方文档确定每个配置的含义,创建好配置文件后,需要导入到nacos中,目前官方支持2种脚本导入方式,一种是python,另外一种是linux脚本,可以去github下载或者直接复制自己创建,下载地址如下:python导入脚本 和 linux导入脚本
其中linux导入脚本命令如下
1 nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -u nacos -w nacos
nacos-config.sh为脚本名字,如果有修改需要修改成对应的名称,-h后面是nacos的服务器地址,-p为nacos的端口地址,-u后面为nacos的用户名,-w后面为nacos的用户名密码,-g后面为配置的分组,也就是我们之前手动创建配置时填的Group对应的值。
注意:分组必须要在同一个分组内,也就是说Seata配置的分组和需要分布式事务的程序的分组以及Seata-server的分组必须要在同一个分组内
以上是完整配置的,但是如果只是想实现基本的分布式事务,可以忽略上面一步
创建表 首先Seata要处理事务,需要创建一个Seata事务的日志表(UNDO_LOG),所有库里面都需要这个表(如果每个项目连接的库不一致,那么每个项目连接的库中则都需要此表)
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
其次创建我们的业务表,这里为了方便演示,我创建2个非常简单的表,一个用户表,一个订单表,我们保存用户时,就调用订单服务保存一个订单,当用户id为偶数时,用户服务抛异常,触发回滚(表结构非常简陋且不符合逻辑,只为演示作)
1 2 3 4 5 6 CREATE TABLE `user` ( `id` int NOT NULL AUTO_INCREMENT, `name` varchar(32) NOT NULL, `passwords` varchar(32) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
1 2 3 4 5 6 CREATE TABLE `trade` ( `id` int NOT NULL AUTO_INCREMENT, `userid` int NOT NULL, `value` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
创建项目 创建一个user和trade的maven项目,其中pom依赖一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR9</spring-cloud.version> <spring.cloud.alibaba.version>2.2.3.RELEASE</spring.cloud.alibaba.version> <seata.seata-all.version>1.1.0</seata.seata-all.version> <spring-cloud-alibaba-seata.version>2.2.0.RELEASE</spring-cloud-alibaba-seata.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- druid阿里巴巴数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency> <!-- Mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.2</version> </dependency> <!-- MySql数据库驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>${spring.cloud.alibaba.version}</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> <version>${spring-cloud-alibaba-seata.version}</version> </dependency> <!-- 添加 seata starter ,与服务端保持一致--> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.0</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>${spring.cloud.alibaba.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
配置trade项目application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 server: port: 8083 spring: application: name: trade datasource: druid: url: jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=CTT&allowPublicKeyRetrieval=true username: root password: rayewang. driver-class-name: com.mysql.cj.jdbc.Driver max-active: 20 initial-size: 1 min-idle: 3 max-wait: 60000 pool-prepared-statements: true test-while-idle: true time-between-eviction-runs-millis: 60000 min-evictable-idle-time-millis: 300000 filters: stat,wall,stat,slf4j,default web-stat-filter: enabled: true url-pattern: /* exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*" stat-view-servlet: enabled: true url-pattern: /druid/* reset-enable: true login-username: Raye login-password: 123456 filter: slf4j: enabled: true statement-create-after-log-enabled: false statement-log-enabled: false statement-executable-sql-log-enable: true statement-log-error-enabled: true result-set-log-enabled: false servlet: multipart: max-file-size: 10MB max-request-size: 100MB cloud: nacos: discovery: server-addr: 127.0.0.1:8848 group: SEATA_GROUP feign: client: config: default: connect-timeout: 10000 #单位毫秒 read-timeout : 10000 #单位毫秒 okhttp: enabled: true httpclient: enabled: true debug: false logging: level: druid: sql: Statement: DEBUG nacos: group: SEATA_GROUP namespace: public # 配置中心地址 server-addr: 127.0.0.1:8848 seata: application: seata-server tx-service-group: my_test_tx_group seata: enabled: true application-id: ${spring.application.name} tx-service-group: ${nacos.seata.tx-service-group} enable-auto-data-source-proxy: true config: # 指明类型 type: nacos nacos: server-addr: ${nacos.server-addr} namespace: ${nacos.namespace} group: ${nacos.group} username: "nacos" password: "nacos" registry: type: nacos nacos: application: ${nacos.seata.application} server-addr: ${nacos.server-addr} namespace: ${nacos.namespace} group: ${nacos.group} username: "nacos" password: "nacos"
其中分布式事务核心配置为nacos节点和seata节点的内容,seata节点内容配置大致跟seata-server的配置一致,其中tx-service-group的值就是我们之前手动添加的配置的节点最后一个名称,user项目的application.yml跟trade基本一致,除了spring.application.name和端口不同
Springcloud的核心配置为spring.cloud节点下的内容,主要是配置nacos为注册发现中心以及应用的分组,要跟Seata配置的分组一致
业务流程 项目创建好之后,我们先梳理整体流程,首先调用user项目的服务保存用户,然后在保存用户服务中开启分布式事务,保存用户后调用trade服务保存订单,订单保存完毕之后,判断用户的id是否是偶数,如果是偶数则抛出一个异常,看保存的用户和订单信息是否正常回滚,如果是奇数则看数据是否正常保存
编写trade项目相关代码 以下只列出逻辑相关代码
TradeMapper.java
1 2 3 4 5 6 7 8 9 package wang.raye.nacos; import org.apache.ibatis.annotations.Insert; public interface TradeMapper { @Insert({"insert into trade(userid,`value`) values(#{userid},#{value})"}) int insertTrade(int userid,int value); }
TradeService.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package wang.raye.nacos; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service public class TradeService { @Autowired private TradeMapper mapper; @Transactional(rollbackFor = Exception.class) public boolean saveTrade(int userid,int value){ return mapper.insertTrade(userid,value) > 0; } }
TradeController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package wang.raye.nacos; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class TradeController { @Autowired private TradeService tradeService; @RequestMapping("trade/save") public boolean save(int userid,int value){ return tradeService.saveTrade(userid,value); } }
编写user相关代码 UserMapper.java
1 2 3 4 5 6 7 8 9 package wang.raye.nacos; import org.apache.ibatis.annotations.Insert; public interface UserMapper { @Insert({"insert user(id,name,passwords) values(#{id},#{name},#{passwords})"}) int insert(int id,String name,String passwords); }
UserService.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package wang.raye.nacos; import io.seata.spring.annotation.GlobalTransactional; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service public class UserService { @Autowired private TradeService service; @Autowired private UserMapper mapper; @GlobalTransactional @Transactional(rollbackFor = Exception.class) public boolean save(int userid,String name,String passwords,int value){ mapper.insert(userid,name,passwords); service.save(userid,value); if(userid % 2 == 0){ throw new RuntimeException("不给保存双数id"); } return true; } }
其中这里主要用注解@GlobalTransactional来开启分布式事务,代码
1 2 3 if(userid % 2 == 0){ throw new RuntimeException("不给保存双数id"); }
只是为了模拟一下发生异常的情况,看是否会正常回滚,TradeService为Springcloud调用trade项目的
TradeService.java
1 2 3 4 5 6 7 8 9 10 11 12 13 package wang.raye.nacos; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; @FeignClient("trade") public interface TradeService { @RequestMapping("trade/save") boolean save(@RequestParam("userid") int userid,@RequestParam("value") int value); }
UserController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package wang.raye.nacos; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class UserController { @Autowired private UserService service; @GetMapping("save") public String save(int id,String name,String passwords,int value){ try { if (service.save(id, name, passwords, value)) { return "保存成功"; } return "保存失败"; }catch (Exception e){ return e.getMessage(); } } }
测试 至此业务代码已经完成,跟没用分布式事务的代码只有一个注解的差异,所以应该还是很好理解的,启动2个项目,我们来测试一下,访问http://localhost:8082/save?id=16&name=raye&passwords=1234566&value=100 (其中8082为user项目的端口),会提示不给保存双数id,并且保存的订单数据也会被删除掉,可以跟踪执行的sql,发现trade是先插入,然后再删除的,而用户是插入后回滚的,利用了本地事务,而将id改成奇数,发现能正常保存,说明AT模式的分布式事务已经搭建成功