一、关于

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

本文代码仓库地址:https://github.com/lazyrabb1t/rabb-springcloud-demo

二、事务

2.1 事务并发可能出现的情况

2.1.1 脏读(Dirty Read)

一个事务读到了另一个未提交事务修改过的数据

2.1.2 不可重复读(Non-Repeatable Read)

一个事务只能读到另一个已经提交的事务修改过的数据,并且其他事务每对该数据进行一次修改并提交后,该事务都能查询得到最新值。(不可重复读在读未提交和读已提交隔离级别都可能会出现)

2.1.3 幻读(Phantom)

一个事务先根据某些条件查询出一些记录,之后另一个事务又向表中插入了符合这些条件的记录,原先的事务再次按照该条件查询时,能把另一个事务插入的记录也读出来。(幻读在读未提交、读已提交、可重复读隔离级别都可能会出现)

2.2 事务的隔离级别

MySQL的事务隔离级别一共有四个,分别是读未提交、读已提交、可重复读以及可串行化。

MySQL的隔离级别的作用就是让事务之间互相隔离,互不影响,这样可以保证事务的一致性。

隔离级别比较:可串行化>可重复读>读已提交>读未提交

隔离级别对性能的影响比较:可串行化>可重复读>读已提交>读未提交

由此看出,隔离级别越高,所需要消耗的MySQL性能越大(如事务并发严重性),为了平衡二者,一般建议设置的隔离级别为可重复读,MySQL默认的隔离级别也是可重复读。

2.2.1 读未提交(READ UNCOMMITTED)

在读未提交隔离级别下,事务A可以读取到事务B修改过但未提交的数据。

可能发生脏读、不可重复读和幻读问题,一般很少使用此隔离级别。

2.2.2 读已提交(READ COMMITTED)

在读已提交隔离级别下,事务B只能在事务A修改过并且已提交后才能读取到事务B修改的数据。

读已提交隔离级别解决了脏读的问题,但可能发生不可重复读和幻读问题,一般很少使用此隔离级别。

2.2.3 可重复读(REPEATABLE READ)

在可重复读隔离级别下,事务B只能在事务A修改过数据并提交后,自己也提交事务后,才能读取到事务B修改的数据。

可重复读隔离级别解决了脏读和不可重复读的问题,但可能发生幻读问题。

2.2.4 可串行化(SERIALIZABLE)

各种问题(脏读、不可重复读、幻读)都不会发生,通过加锁实现(读锁和写锁)。

两个事务同时读的场景下不会加锁,其他都会加锁。

2.3 MYSQL隔离级别的实现原理

2.3.1 Mysql锁

按锁粒度分类:

  • 行级锁是mysql中锁定粒度最细的一种锁。表示只针对当前操作的行进行加锁。行级锁能大大减少数据库操作的冲突,其加锁粒度最小,但加锁的开销也最大。行级锁分为共享锁和排他锁
  • 表级锁是mysql中锁定粒度最大的一种锁,表示对当前操作的整张表加锁,它实现简单,资源消耗较少,被大部分mysql引擎支持。
  • 页级锁是 MySQL 中锁定粒度介于行级锁和表级锁中间的一种锁。表级锁速度快,但冲突多,行级冲突少,但速度慢。因此,采取了折衷的页级锁,一次锁定相邻的一组记录。

锁级别分类:

  • 读锁又称共享锁(shared lock),因此也叫S锁。共享锁指多个事务对于同一记录可以共享一把锁。多个事务都能访问到记录,但是只能读不能修改。
  • 写锁也叫排它锁(exclusive lock),简称X锁。排他锁意味着不能与其他锁并存,若一个事务获取了某一记录的排他锁,其他事务就不能再获取该记录的其他锁(包括共享锁和排他锁)。获取排他锁的事务可以读取和修改记录。
2.3.2 读未提交

不加锁,因此不支持事务隔离

2.3.2 串行化

读的时候加共享锁,也就是其他事务可以并发读,但是不能写。写的时候加排它锁,其他事务不能并发写也不能并发读。

2.3.2 可重复读

MySQL 的 InnoDB 引擎才支持事务,其中可重复读是默认的隔离级别。

为了解决不可重复读,MySQL 采用了 MVVC (多版本并发控制) 的方式。每条记录在更新的时候都会同时记录一条回滚操作(回滚操作日志undo log)。同一条记录在系统中可以存在多个版本,通过回滚(rollback操作),可以回到前一个状态的值

我们在数据库表中看到的一行记录可能实际上有多个版本,每个版本的记录除了有数据本身外,还要有一个表示版本的字段,记为 row trx_id,而这个字段就是使其产生的事务的 id,事务 ID 记为 transaction id,它在事务开始的时候向事务系统申请,按时间先后顺序递增。

可重复读是在事务开始的时候生成一个当前事务全局性的快照,而读提交则是每次执行语句的时候都重新生成一次快照。

对于一个快照来说,它能够读到那些版本数据,要遵循以下规则:

  • 当前事务内的更新,可以读到;
  • 版本未提交,不能读到;
  • 版本已提交,但是却在快照创建后提交的,不能读到;
  • 版本已提交,且是在快照创建前提交的,可以读到;

三、Seata事务模式

3.1 AT 模式

两阶段提交协议的演变:

  • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
  • 二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。

3.2 TCC 模式

TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

相较于AT模式,AT 模式基于支持本地 ACID事务的关系型数据库,会自动生成回滚日志,不需要自定义业务逻辑实现回滚。

3.3 Saga 模式

Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

3.4 XA 模式

在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。

四、使用

模拟一个下订单的过程,rabb-seata-order-servic为订单模块,rabb-seata-storage-service为商品库存模块,调用订单模块的接口下订单时,需要调用库存模块减少商品库存。

4.1 安装seata

4.1.1 下载seata

下载地址:https://github.com/seata/seata/releases

这里我使用的是1.4.2版本。

4.1.2 创建数据库

Server端存储模式(store.mode)现有file、db、redis三种,这里我使用的是db模式,因此需要先创建数据库,数据库名称为seata,然后导入建表脚本,脚本地址:https://github.com/seata/seata/blob/1.4.2/script/server/db/mysql.sql

4.1.3 修改配置

seata/conf/file.conf文件配置:

  • store块中mode改为db
  • db块中数据库连接配置根据实际环境进行配置
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai"
    user = "root"
    password = "123456"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
  ...
}

seata/conf/registry.conf文件配置,这里我将注册中心以及配置中心都配到了nacos上:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
  ...
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
  ...
}
4.1.5 nacos配置

启动nacos,创建一个dataId为seataServer.properties,group为SEATA_GROUP的配置,其值内容可以复制config.txt中内容(https://github.com/seata/seata/blob/1.4.2/script/config-center/config.txt),然后修改配置中```store.mode=DB```,以及```store.db```相关配置,最后保存即可。

dataId以及group可以通过修改registry.conf中配置中心相关内容进行自定义。

4.1.6 启动
seata-server.sh -h 127.0.0.1 -p 8091 -m db -n 1 -e test

windows直接双击seata-server.bat即可。

4.2 创建库存模块

4.2.1 添加依赖
  <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--mybatisPlus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <!--mybatis-plus代码生成器-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.freemarker</groupId>
            <artifactId>freemarker</artifactId>
        </dependency>
        <!-- 引入druid连接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>xyz.lazyrabbit</groupId>
            <artifactId>rabb-common</artifactId>
        </dependency>
    </dependencies>
4.2.2 配置

bootstrap.yml,配置nacos连接信息:

spring:
  cloud:
    # 设置Nacos服务端配置
    nacos:
      server-addr: localhost:8848
      username: nacos
      password: nacos

application.yml,配置seata以及数据源:

server:
  port: 20502
spring:
  application:
    name: rabb-seata-storage-service
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1/seata_storage?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true
    username: root
    password: 123456
mybatis-plus:
  # xml文件位置
  mapper-locations:
    - classpath:mapper/*.xml
  # 实体类位置
  type-aliases-package: xyz.lazyrabbit.seata.storage.entity
  # 打印sql日志
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
seata:
  enabled: true
  application-id: ${spring.application.name}
  # 客户端和服务端在同一个事务组,其值与nacos配置中心中service.vgroupMapping对应
  tx-service-group: default_tx_group
  # 自动数据源代理
  enable-auto-data-source-proxy: true
  # 数据源代理模式(分布式事务方案)
  data-source-proxy-mode: AT
  # 事务群组,配置项值为TC集群名,需要与服务端保持一致,,其值与nacos配置中心中service.vgroupMapping对应
  service:
    vgroup-mapping:
      default_tx_group: default
  #整合nacos配置中心
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      #      namespace: 24712b7c-05ad-4b79-af97-1d202431f521
      data-id: seataServer.properties
  #整合nacos注册中心
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      #      namespace: 24712b7c-05ad-4b79-af97-1d202431f521
      # 默认TC集群名
      cluster: default
      # 服务名,与服务端中registry.conf配置要一致
      application: seata-server
4.2.3 添加业务方法

以下是service以及controller方法:

@RestController
@RequestMapping("/storage/sku")
@Slf4j
public class SkuController {
    @Autowired
    SkuService skuService;

    @GetMapping("{id}")
    public Response manage(@PathVariable Integer id, @RequestParam Integer count) {
        log.info("操作库存,商品:{},数量:{}", id, count);
        skuService.manage(id, count);
        return ResponseUtils.success("商品:" + id + ",减少库存数量:" + count);
    }
}
@Service
public class SkuServiceImpl extends ServiceImpl<SkuMapper, Sku> implements SkuService {

    @Override
    public void manage(Integer skuId, Integer count) {
        Sku sku = this.getById(skuId);
        sku.setCount(sku.getCount() - count);
        this.saveOrUpdate(sku);
    }
}

4.3 创建订单模块

4.3.1 添加依赖

参照4.2.1

4.3.2 配置

参照4.2.2,其中数据库部分需要进行相应调整

4.3.3 定义feign
@FeignClient(value = "${rabb.service.storage-service}", fallbackFactory = StorageFallbackFactoryService.class)
public interface StorageService {

    @GetMapping("/storage/sku/{id}")
    Response manage(@PathVariable(value = "id") Integer id, @RequestParam(value = "count") Integer count);

}
4.3.4 业务方法
@Service
@Slf4j
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {

    @Autowired
    StorageService storageService;

    @Override
    @GlobalTransactional
    public void place(String orderName, Integer skuId, Integer count) {
        // 减少库存
        Response response = storageService.manage(skuId, count);
        log.info("调用减少库存接口,返回结果:{}", response);
        // 创建订单
        Order order = new Order();
        order.setSkuId(skuId);
        order.setOrderName(orderName);
//        order.setCreateTime(LocalDateTime.now());
        this.saveOrUpdate(order);
        // 随机抛出异常
        if (new Random().nextBoolean()) {
            log.info("下单接口抛出异常");
            throw new RuntimeException("hhh");
        }
    }

    @Override
    @Transactional
    public void placeWithLocalTransaction(String orderName, Integer skuId, Integer count) {
        Order order = new Order();
        order.setSkuId(skuId);
        order.setOrderName(orderName);
//        order.setCreateTime(LocalDateTime.now());
        this.saveOrUpdate(order);
        int i = 10 / 0;
    }
}
@RestController
@RequestMapping("/order")
public class OrderController {

    @Autowired
    OrderService orderService;

    @GetMapping
    public Response place(Integer skuId, Integer count) {
        orderService.place(randomName(), skuId, count);
        return ResponseUtils.success("下单成功");
    }

    private String randomName() {
        return "订单_" + UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
    }
}

4.5 创建数据库

4.5.1 创建订单库

创建seata_order库,执行以下脚本,其中undo_log表脚本可在github下载,地址:https://github.com/seata/seata/blob/1.4.2/script/client/at/db/mysql.sql

SET FOREIGN_KEY_CHECKS=0;

DROP TABLE IF EXISTS `t_sku`;
CREATE TABLE `t_sku` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '商品ID',
  `sku_name` varchar(255) NOT NULL COMMENT '商品名称',
  `count` int NOT NULL DEFAULT '0' COMMENT '库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

INSERT INTO `t_sku` VALUES ('1', '奥迪', '1000');

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';
4.5.2 创建库存库

创建seata_storage库,执行以下脚本:

SET FOREIGN_KEY_CHECKS=0;

DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '订单ID',
  `order_name` varchar(63) NOT NULL COMMENT '订单名称',
  `sku_id` int NOT NULL COMMENT '商品ID',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT 'branch transaction id',
  `xid` varchar(128) NOT NULL COMMENT 'global transaction id',
  `context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
  `rollback_info` longblob NOT NULL COMMENT 'rollback info',
  `log_status` int NOT NULL COMMENT '0:normal status,1:defense status',
  `log_created` datetime(6) NOT NULL COMMENT 'create datetime',
  `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';

4.6 测试

启动nacos、seata、订单模块以及库存模块。

调用订单模块的下单接口,当订单模块出现异常时,可以看到订单没有新增且库存没有减少则代表分布式事务启用成功。

4.7 问题

存在datetime类型的字段时,seata1.4.2版本无法进行回滚,会报Cannot construct instance of `java.time.LocalDateTime` 错误。

在官方仓库的Issues中也已经存在提交的相关问题:

参考

https://seata.io/zh-cn/docs/ops/deploy-guide-beginner.html

https://blog.csdn.net/sermonlizhi/article/details/123055175