Springcloud+Seata+nacos 分布式事务项目搭建 AT模式

前言

本文要求对Springcloud有一定了解,对分布式事务本身有一定认知,如果不了解微服务,建议先看看Spring Cloud的基本认识和使用Spring Cloud的基本教程,再回头学习本文

为什么会出现分布式事务

开发过程中,随着项目模块的增加以及分库分表的出现,传统事务已经无法满足业务需求,如分库,由于有多个数据源,而数据库事务又是基于数据库层,所以如果只用数据库原生事务,会导致数据库A成功提交,数据库B回滚,导致数据不一致,又比如多模块下,常见的订单流程,订单服务成功提交订单,调用库存服务扣减库存,由于是链式调用,库存成功扣减,然后回到订单服务时,出现异常,导致订单回滚,但是此时库存却未回滚,也会导致数据不一致,所以这些情况都需要分布式事务来解决这个问题(当然一般开发中,我们常用的做法是能避免就尽量避免,实在避免不了才使用分布式事务,因为分布式事务不管怎么样,性能,一致性,原子性等都会收到影响)

分布式事务目前的几种方案

2PC(二阶段提交)

3PC(三阶段提交)

TCC(Try - Confirm - Cancel)

最终一致性(消息队列等方式)

最大努力通知(数据不要求强一致)

每种方案都各有优劣,具体采用何种方案还需要根据实际业务场景来使用

初识Seata

Seata 是一款阿里巴巴开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。详情可以去查看seata官方文档

初始Nacos

Nacos 是一款阿里巴巴开源的服务注册中心,配置中心,管理中心,以下是nacos官网简介:

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

我们可以把nacos当作Eureka +Srpingcloud config的结合体,详情可以查去查看nacos官方文档

下载并部署Nacos

Nacos跟Eureka 不一样,Eureka是创建一个Eureka 的项目即可,但是Nacos需要专门下载Nacos的服务端,可以从下载地址去下载最新的稳定版,目前编写本文时最新版为2.0.0-ALPHA.2,下载好之后解压,进入bin目录,并根据系统执行相关的脚本启动服务,记得脚本后面加上参数

COPY
1
-m standalone

standalone代表着单机模式运行,非集群模式,由于本文只讲解单机模式,所以不用集群模式,集群模式还要配置其他东西,启动成功后浏览器访问http://localhost:8848/nacos,能看到登录界面,输入默认账号密码nacos(账号密码都是nacos),能进入首页,如下图

nacos图片

下载并部署Seata

下载并配置

https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩,本文编写时最新版为1.4.0,进入conf目录,编辑registry.conf,配置Seata的registry和config为nacos模式,同时配置nacos相关配置,首先修改registry(注册中心)节点下的type值为nacos,然后配置register节点下的nacos节点。

旧配置:

COPY
1
2
3
4
5
6
7
8
9
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}

新配置

COPY
1
2
3
4
5
6
7
8
9
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "public"
cluster = "default"
username = "nacos"
password = "nacos"
}

application 为Seata-server注册到nacos的应用名称,serverAddr 为nacos的地址,group为注册到nacos的应用分组,要使用seata,必须要seata-server跟其他使用使用seata分布式事务的应用在同一分组才行,namespace 为命名空间,我们使用默认的public,当然也可以创建其他命名空间来使用,username和password对应的事nacos的账号密码

修改config(配置中心)节点下type为nacos,同时修改config节点下的nacos节点

修改前配置

COPY
1
2
3
4
5
6
7
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}

修改后

COPY
1
2
3
4
5
6
7
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "public"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
}

添加Seata配置
进入nacos里面,添加一个事务分组的配置(常规来说要添加很多配置,不过只要添加了事务分组配置,就可以满足基本运行要求)

点击添加按钮新增一个配置

添加按钮

添加如下配置:

添加配置

其中配置名称

COPY
1
service.vgroupMapping.my_test_tx_group

中的my_test_tx_group是我们自定义的名称,到时候需要配置项目中,可以根据实际情况配置,Group一定要跟之前Seata中配置的group对应上,内容default要跟之前Seata中的register节点下的cluster对应上,点击发布保存配置

完整Seata配置(此步骤非必须)

以上配置只是实现分布式事务的最基本的配置,如果想要配置所有支持的参数配置,可以从github下载配置文件config.txt,或者创建config.txt文件并复制写入下面代码(下面代码不是实时更新,所以最好还是去github下载)

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=username
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

以上配置含义建议百度或者查看官方文档确定每个配置的含义,创建好配置文件后,需要导入到nacos中,目前官方支持2种脚本导入方式,一种是python,另外一种是linux脚本,可以去github下载或者直接复制自己创建,下载地址如下:python导入脚本和   linux导入脚本

其中linux导入脚本命令如下

COPY
1
nacos-config.sh -h 127.0.0.1 -p 8848  -g SEATA_GROUP -u nacos -w nacos

nacos-config.sh为脚本名字,如果有修改需要修改成对应的名称,-h后面是nacos的服务器地址,-p为nacos的端口地址,-u后面为nacos的用户名,-w后面为nacos的用户名密码,-g后面为配置的分组,也就是我们之前手动创建配置时填的Group对应的值。

注意:分组必须要在同一个分组内,也就是说Seata配置的分组和需要分布式事务的程序的分组以及Seata-server的分组必须要在同一个分组内

以上是完整配置的,但是如果只是想实现基本的分布式事务,可以忽略上面一步

创建表

首先Seata要处理事务,需要创建一个Seata事务的日志表(UNDO_LOG),所有库里面都需要这个表(如果每个项目连接的库不一致,那么每个项目连接的库中则都需要此表)

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

其次创建我们的业务表,这里为了方便演示,我创建2个非常简单的表,一个用户表,一个订单表,我们保存用户时,就调用订单服务保存一个订单,当用户id为偶数时,用户服务抛异常,触发回滚(表结构非常简陋且不符合逻辑,只为演示作)

COPY
1
2
3
4
5
6
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(32) NOT NULL,
`passwords` varchar(32) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
COPY
1
2
3
4
5
6
CREATE TABLE `trade` (
`id` int NOT NULL AUTO_INCREMENT,
`userid` int NOT NULL,
`value` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

创建项目

创建一个user和trade的maven项目,其中pom依赖一致

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
  <properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR9</spring-cloud.version>
<spring.cloud.alibaba.version>2.2.3.RELEASE</spring.cloud.alibaba.version>
<seata.seata-all.version>1.1.0</seata.seata-all.version>
<spring-cloud-alibaba-seata.version>2.2.0.RELEASE</spring-cloud-alibaba-seata.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- druid阿里巴巴数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!-- Mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<!-- MySql数据库驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${spring.cloud.alibaba.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba-seata.version}</version>
</dependency>
<!-- 添加 seata starter ,与服务端保持一致-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

配置trade项目application.yml

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

server:
port: 8083
spring:
application:
name: trade
datasource:
druid:
url: jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=CTT&allowPublicKeyRetrieval=true
username: root
password: rayewang.
driver-class-name: com.mysql.cj.jdbc.Driver

max-active: 20
initial-size: 1
min-idle: 3
max-wait: 60000
pool-prepared-statements: true
test-while-idle: true
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 300000
filters: stat,wall,stat,slf4j,default
web-stat-filter:
enabled: true
url-pattern: /*
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
stat-view-servlet:
enabled: true
url-pattern: /druid/*
reset-enable: true
login-username: Raye
login-password: 123456
filter:
slf4j:
enabled: true
statement-create-after-log-enabled: false
statement-log-enabled: false
statement-executable-sql-log-enable: true
statement-log-error-enabled: true
result-set-log-enabled: false
servlet:
multipart:
max-file-size: 10MB
max-request-size: 100MB
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
feign:
client:
config:
default:
connect-timeout: 10000 #单位毫秒
read-timeout : 10000 #单位毫秒
okhttp:
enabled: true
httpclient:
enabled: true
debug: false
logging:
level:
druid:
sql:
Statement: DEBUG
nacos:
group: SEATA_GROUP
namespace: public
# 配置中心地址
server-addr: 127.0.0.1:8848
seata:
application: seata-server
tx-service-group: my_test_tx_group

seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: ${nacos.seata.tx-service-group}
enable-auto-data-source-proxy: true
config:
# 指明类型
type: nacos
nacos:
server-addr: ${nacos.server-addr}
namespace: ${nacos.namespace}
group: ${nacos.group}
username: "nacos"
password: "nacos"
registry:
type: nacos
nacos:
application: ${nacos.seata.application}
server-addr: ${nacos.server-addr}
namespace: ${nacos.namespace}
group: ${nacos.group}
username: "nacos"
password: "nacos"

其中分布式事务核心配置为nacos节点和seata节点的内容,seata节点内容配置大致跟seata-server的配置一致,其中tx-service-group的值就是我们之前手动添加的配置的节点最后一个名称,user项目的application.yml跟trade基本一致,除了spring.application.name和端口不同

Springcloud的核心配置为spring.cloud节点下的内容,主要是配置nacos为注册发现中心以及应用的分组,要跟Seata配置的分组一致

业务流程

项目创建好之后,我们先梳理整体流程,首先调用user项目的服务保存用户,然后在保存用户服务中开启分布式事务,保存用户后调用trade服务保存订单,订单保存完毕之后,判断用户的id是否是偶数,如果是偶数则抛出一个异常,看保存的用户和订单信息是否正常回滚,如果是奇数则看数据是否正常保存

编写trade项目相关代码

以下只列出逻辑相关代码

TradeMapper.java

COPY
1
2
3
4
5
6
7
8
9
package wang.raye.nacos;

import org.apache.ibatis.annotations.Insert;
public interface TradeMapper {

@Insert({"insert into trade(userid,`value`) values(#{userid},#{value})"})
int insertTrade(int userid,int value);
}

TradeService.java

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package wang.raye.nacos;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TradeService {

@Autowired
private TradeMapper mapper;

@Transactional(rollbackFor = Exception.class)
public boolean saveTrade(int userid,int value){
return mapper.insertTrade(userid,value) > 0;
}
}

TradeController.java

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package wang.raye.nacos;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TradeController {

@Autowired
private TradeService tradeService;

@RequestMapping("trade/save")
public boolean save(int userid,int value){
return tradeService.saveTrade(userid,value);
}
}

编写user相关代码

UserMapper.java

COPY
1
2
3
4
5
6
7
8
9
package wang.raye.nacos;

import org.apache.ibatis.annotations.Insert;

public interface UserMapper {

@Insert({"insert user(id,name,passwords) values(#{id},#{name},#{passwords})"})
int insert(int id,String name,String passwords);
}

UserService.java

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package wang.raye.nacos;

import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class UserService {
@Autowired
private TradeService service;
@Autowired
private UserMapper mapper;

@GlobalTransactional
@Transactional(rollbackFor = Exception.class)
public boolean save(int userid,String name,String passwords,int value){
mapper.insert(userid,name,passwords);
service.save(userid,value);
if(userid % 2 == 0){
throw new RuntimeException("不给保存双数id");
}
return true;
}
}

其中这里主要用注解@GlobalTransactional来开启分布式事务,代码

COPY
1
2
3
if(userid % 2 == 0){
throw new RuntimeException("不给保存双数id");
}

只是为了模拟一下发生异常的情况,看是否会正常回滚,TradeService为Springcloud调用trade项目的

TradeService.java

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
package wang.raye.nacos;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient("trade")
public interface TradeService {

@RequestMapping("trade/save")
boolean save(@RequestParam("userid") int userid,@RequestParam("value") int value);
}

UserController.java

COPY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package wang.raye.nacos;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
@Autowired
private UserService service;

@GetMapping("save")
public String save(int id,String name,String passwords,int value){
try {
if (service.save(id, name, passwords, value)) {
return "保存成功";
}
return "保存失败";
}catch (Exception e){
return e.getMessage();
}
}
}

测试

至此业务代码已经完成,跟没用分布式事务的代码只有一个注解的差异,所以应该还是很好理解的,启动2个项目,我们来测试一下,访问http://localhost:8082/save?id=16&name=raye&passwords=1234566&value=100(其中8082为user项目的端口),会提示不给保存双数id,并且保存的订单数据也会被删除掉,可以跟踪执行的sql,发现trade是先插入,然后再删除的,而用户是插入后回滚的,利用了本地事务,而将id改成奇数,发现能正常保存,说明AT模式的分布式事务已经搭建成功

Authorship: 作者
Article Link: https://raye.wang/2021/02/26/Springcloud-Seata-nacos-%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1%E9%A1%B9%E7%9B%AE%E6%90%AD%E5%BB%BA-AT%E6%A8%A1%E5%BC%8F/
Copyright: All posts on this blog are licensed under the CC BY-NC-SA 4.0 license unless otherwise stated. Please cite Raye Blog !