SpringCloud整合Seata解决分布式事务(搭建+源码)

SpringCloud 整合Seata 解决分布式事务(搭建+源码)

作者:Thinking曹 2020-12-09 09:14:57

开发

前端

分布式 本篇Spring Cloud整合Seata之前,你必须要了解一下Spring Cloud Alibaba与Spring Boot、Spring Cloud之间的版本对应关系。

成都创新互联公司主要从事成都网站建设、做网站、网页设计、企业做网站、公司建网站等业务。立足成都服务乌拉特中,10余年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:028-86922220

 seata官网:http://seata.io/zh-cn/

前言

在当下微服务架构比较火热时,新一代微服务解决方案Spring Cloud Alibaba提供的开源分布式事务解决框架Seata无疑成为了我们在解决分布式事务时的首要之选,前面两篇文章分别介绍了常见的分布式解决方案和成熟的框架以及关于Seata概念的入门介绍,没有过分布式事务处理的小伙伴可以先有个大致的入门了解:

  • SpringCloud Alibaba微服务架构(十一)- 常见分布式事务解决方案及理论基础篇
  • SpringCloud Alibaba微服务架构(十二)- 分布式事务解决框架之Seata概念入门篇

那么在本篇Spring Cloud整合Seata之前,你必须要了解一下Spring Cloud Alibaba与Spring Boot、Spring Cloud之间的版本对应关系。

版本选择: Spring Cloud Alibaba与Spring Boot、Spring Cloud版本对应关系

一、版本要求

坑点1: 如果项目中使用了druid数据库连接池,引入的是SpringBoot的Starter依赖druid-spring-boot-starter,那么需要把druid-spring-boot-starter依赖换成druid1.1.23,因为seata源码中引入的druid依赖跟druid-spring-boot-starter的自动装配类冲突了,冲突的情况下项目启动出现异常,异常如下:

二、整合Seata环境配置

1. 下载seata-server-1.2.0和seata-1.2.0源码

seate-server下载: https://seata.io/zh-cn/blog/download.html,下载我们需要使用的seata1.2压缩包。

seata-1.2.0源码下载: https://github.com/seata/seata/releases

在这里插入图片描述

2. 创建undo_log日志表

在seata1.2源码seata-1.2.0\script\client\at\db目录下有提供针对mysql、oracle、postgresql这三种数据库生成undo-log逆向日志回滚表的表创建脚本。

  • 在你项目的参与全局事务的数据库中加入undo_log这张表。undo_log表脚本根据自身数据库类型来选择。
  
 
 
 
  1. -- for AT mode you must to init this sql for you business database. the seata server not need it. 
  2. CREATE TABLE IF NOT EXISTS `undo_log` 
  3.     `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id', 
  4.     `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id', 
  5.     `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', 
  6.     `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info', 
  7.     `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status', 
  8.     `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime', 
  9.     `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime', 
  10.     UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) 
  11. ) ENGINE = InnoDB 
  12.   AUTO_INCREMENT = 1 
  13.   DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table'; 

3.创建seata事务相关表

下载Seata1.2的源码后解压如上图,目前支持mysql、oracle、postgresql这三种数据库,上述三种脚本是针对Seata的Sever端在协调处理分布式事务时所需要的3张表,提供了不同数据库的global_table表、branch_table表、lock_table表创建脚本,根据自身数据库执行对应的sql脚本执行即可。

这里以mysql为例,在你的mysql数据库中创建名为seata的库,并执行以下sql,将会生成三张表:

  
 
 
 
  1. -- -------------------------------- The script used when storeMode is 'db' -------------------------------- 
  2. -- the table to store GlobalSession data 
  3. CREATE TABLE IF NOT EXISTS `global_table` 
  4.     `xid`                       VARCHAR(128) NOT NULL, 
  5.     `transaction_id`            BIGINT, 
  6.     `status`                    TINYINT      NOT NULL, 
  7.     `application_id`            VARCHAR(32), 
  8.     `transaction_service_group` VARCHAR(32), 
  9.     `transaction_name`          VARCHAR(128), 
  10.     `timeout`                   INT, 
  11.     `begin_time`                BIGINT, 
  12.     `application_data`          VARCHAR(2000), 
  13.     `gmt_create`                DATETIME, 
  14.     `gmt_modified`              DATETIME, 
  15.     PRIMARY KEY (`xid`), 
  16.     KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), 
  17.     KEY `idx_transaction_id` (`transaction_id`) 
  18. ) ENGINE = InnoDB 
  19.   DEFAULT CHARSET = utf8; 
  20.  
  21. -- the table to store BranchSession data 
  22. CREATE TABLE IF NOT EXISTS `branch_table` 
  23.     `branch_id`         BIGINT       NOT NULL, 
  24.     `xid`               VARCHAR(128) NOT NULL, 
  25.     `transaction_id`    BIGINT, 
  26.     `resource_group_id` VARCHAR(32), 
  27.     `resource_id`       VARCHAR(256), 
  28.     `branch_type`       VARCHAR(8), 
  29.     `status`            TINYINT, 
  30.     `client_id`         VARCHAR(64), 
  31.     `application_data`  VARCHAR(2000), 
  32.     `gmt_create`        DATETIME(6), 
  33.     `gmt_modified`      DATETIME(6), 
  34.     PRIMARY KEY (`branch_id`), 
  35.     KEY `idx_xid` (`xid`) 
  36. ) ENGINE = InnoDB 
  37.   DEFAULT CHARSET = utf8; 
  38.  
  39. -- the table to store lock data 
  40. CREATE TABLE IF NOT EXISTS `lock_table` 
  41.     `row_key`        VARCHAR(128) NOT NULL, 
  42.     `xid`            VARCHAR(96), 
  43.     `transaction_id` BIGINT, 
  44.     `branch_id`      BIGINT       NOT NULL, 
  45.     `resource_id`    VARCHAR(256), 
  46.     `table_name`     VARCHAR(32), 
  47.     `pk`             VARCHAR(36), 
  48.     `gmt_create`     DATETIME, 
  49.     `gmt_modified`   DATETIME, 
  50.     PRIMARY KEY (`row_key`), 
  51.     KEY `idx_branch_id` (`branch_id`) 
  52. ) ENGINE = InnoDB 
  53.   DEFAULT CHARSET = utf8; 

4. 项目中引入seata依赖

4.1 如果微服务是SpringCloud

  
 
 
 
  1.  
  2.  
  3.  
  4.    com.alibaba.cloud 
  5.    spring-cloud-starter-alibaba-seata 
  6.    2.1.3.RELEASE 
  7.     
  8.       
  9.         io.seata 
  10.         seata-spring-boot-starter 
  11.          
  12.     
  13.  
  14.  
  15.     io.seata 
  16.     seata-spring-boot-starter 
  17.     1.2.0 
  18.  
  19.  

4.2 如果微服务是Dubbo

  
 
 
 
  1.  
  2.     io.seata 
  3.     seata-spring-boot-starter 
  4.     1.2.0 
  5.  

5. 更改seata-server中的registry.conf

配置registry.conf注册中心为nacos,配置nacos相关属性参数。

  
 
 
 
  1. ##配置seata-server的注册中心,支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 
  2. registry { 
  3.   # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa 
  4.   type = "nacos" 
  5.  
  6.   nacos { 
  7.     application = "seata-server" 
  8.     serverAddr = "127.0.0.1:8848" 
  9.     group = "SEATA_GROUP" 
  10.     namespace = "public" 
  11.     username = "nacos" 
  12.     cluster = "default" 
  13.     password = "nacos" 
  14.   } 
  15.    
  16.   file { 
  17.     name = "file.conf" 
  18.   } 
  19.  
  20. ##配置seata-server的配置中心,支持file、nacos 、apollo、zk、consul、etcd3 
  21. config { 
  22.   # file、nacos 、apollo、zk、consul、etcd3 
  23.   type = "nacos" 
  24.  
  25.   nacos { 
  26.     serverAddr = "127.0.0.1:8848" 
  27.     namespace = "public" 
  28.     group = "SEATA_GROUP" 
  29.     username = "nacos" 
  30.     password = "nacos" 
  31.   } 
  32.   
  33.   file { 
  34.     name = "file.conf" 
  35.   } 

6. 修改seata-server中的file.config

配置file.config的DB模式相关参数配置。

  
 
 
 
  1. ##配置seata-server的数据存储方式,支持本地文档和数据库。 
  2. ## transaction log store, only used in seata-server 
  3. store { 
  4.   ## store mode: file、db、redis 
  5.   mode = "db" 
  6.  
  7.   ## file store property 
  8.   file { 
  9.     ## store location dir 
  10.     dir = "sessionStore" 
  11.     # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions 
  12.     maxBranchSessionSize = 16384 
  13.     # globe session size , if exceeded throws exceptions 
  14.     maxGlobalSessionSize = 512 
  15.     # file buffer size , if exceeded allocate new buffer 
  16.     fileWriteBufferCacheSize = 16384 
  17.     # when recover batch read size 
  18.     sessionReloadReadSize = 100 
  19.     # async, sync 
  20.     flushDiskMode = async 
  21.   } 
  22.  
  23.   ## database store property 
  24.   db { 
  25.     ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. 
  26.     datasource = "druid" 
  27.     ## mysql/oracle/postgresql/h2/oceanbase etc. 
  28.     dbType = "mysql" 
  29.     driverClassName = "com.mysql.jdbc.Driver" 
  30.     url = "jdbc:mysql://127.0.0.1:3306/seata" 
  31.     user = "root" 
  32.     password = "root" 
  33.     minConn = 5 
  34.     maxConn = 30 
  35.     globalTable = "global_table" 
  36.     branchTable = "branch_table" 
  37.     lockTable = "lock_table" 
  38.     queryLimit = 100 
  39.     maxWait = 5000 
  40.   } 
  41.  
  42.   ## redis store property 
  43.   redis { 
  44.     host = "127.0.0.1" 
  45.     port = "6379" 
  46.     password = "" 
  47.     database = "0" 
  48.     minConn = 1 
  49.     maxConn = 10 
  50.     queryLimit = 100 
  51.   } 

7. 修改提交nacos脚本到nacos控制台

运行你下载的nacos,并参考:https://github.com/seata/seata/tree/develop/script/config-center 下的config.txt文件并修改:

  
 
 
 
  1. service.vgroupMapping.my_test_tx_group=default 
  2. store.mode=db 
  3. store.db.datasource=druid 
  4. store.db.dbType=mysql 
  5. store.db.driverClassName=com.mysql.jdbc.Driver 
  6. store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true 
  7. store.db.user=username 
  8. store.db.password=password 
  9. store.db.minConn=5 
  10. store.db.maxConn=30 
  11. store.db.globalTable=global_table 
  12. store.db.branchTable=branch_table 
  13. store.db.queryLimit=100 
  14. store.db.lockTable=lock_table 
  15. store.db.maxWait=5000 

运行仓库:https://github.com/seata/seata/tree/develop/script/config-center/nacos 中提供的nacos脚本nacos-config.sh,将以上信息提交到nacos控制台,如果有需要修改参数,可直接通过登录nacos控制台修改。

操作如下图:

8. application.yml配置

从官方github仓库:https://github.com/seata/seata/tree/develop/script/client 拿到参考配置做修改,加到你项目的application.yml文件中。

  
 
 
 
  1. #Seata分布式事务配置(AT模式) 
  2. seata: 
  3.   enabled: true 
  4.   application-id: ${spring.application.name} 
  5.   #客户端和服务端在同一个事务组 
  6.   tx-service-group: my_test_tx_group 
  7.   enable-auto-data-source-proxy: true 
  8.   service: 
  9.     vgroup-mapping: 
  10.       my_test_tx_group: default 
  11.   config: 
  12.     type: nacos 
  13.     nacos: 
  14.       namespace: "public" 
  15.       serverAddr: 127.0.0.1:8848 
  16.       group: SEATA_GROUP 
  17.       username: "nacos" 
  18.       password: "nacos" 
  19.   #服务注册到nacos 
  20.   registry: 
  21.     type: nacos 
  22.     nacos: 
  23.       application: seata-server 
  24.       server-addr: 127.0.0.1:8848 
  25.       group: SEATA_GROUP 
  26.       namespace: "public" 
  27.       username: "nacos" 
  28.       password: "nacos" 
  29.       cluster: default 

9. 运行seata-server

启动运行seata-server,成功后,运行自己的服务提供者,服务参与者。在全局事务调用者(发起全局事务的服务)的接口上加入@GlobalTransactional注解

到此为止,整合SpringCloud整合seata1.2及seata1.2整合nacos的配置与注册中心全部整合完成了。

三、项目准备

如果你经过前面的步骤搭建Seata环境完成了,那么你可以尝试一下启动项目,控制台无异常则搭建成功。

那么下面准备以Seata官方文档上的一个经典例子为题,模拟用户下单,创建订单同时扣减库存数量这一过程中产生的分布式事务问题,然后使用Seata解决,正好使用以下Seata的特性。

1. 订单服务

  • OrderController
  
 
 
 
  1. /** 
  2.  * @desc:  订单服务 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:27 
  5.  */ 
  6. @RestController 
  7. @Slf4j 
  8. @RequestMapping("/order") 
  9. public class OrderController { 
  10.  
  11.     @Autowired 
  12.     private OrderServiceImpl orderService; 
  13.  
  14.     /** 
  15.      * 用户购买下单,模拟全局事务提交 
  16.      * @param pid 
  17.      * @return 
  18.      */ 
  19.     @RequestMapping("/purchase/commit/{pid}") 
  20.     public Order orderCommit(@PathVariable("pid") Integer pid) { 
  21.         return orderService.createOrderCommit(pid); 
  22.     } 
  23.  
  24.     /** 
  25.      * 用户购买下单,模拟全局事务回滚 
  26.      * @param pid 
  27.      * @return 
  28.      */ 
  29.     @RequestMapping("/purchase/rollback/{pid}") 
  30.     public Order orderRollback(@PathVariable("pid") Integer pid) { 
  31.         return orderService.createOrderRollback(pid); 
  32.     } 
  33.  
  • OrderServiceImpl
  
 
 
 
  1. /** 
  2.  * @desc: 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:30 
  5.  */ 
  6. @Service 
  7. @Slf4j 
  8. public class OrderServiceImpl { 
  9.     @Autowired 
  10.     private OrderDao orderDao; 
  11.  
  12.     @Autowired 
  13.     private ProductService productService; 
  14.  
  15.     //用户下单,模拟全局事务提交 
  16.     public Order createOrderCommit(Integer pid) { 
  17.         log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid); 
  18.  
  19.         //1 调用商品微服务,查询商品信息 
  20.         Product product = productService.findByPid(pid); 
  21.         log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product)); 
  22.  
  23.         //2 下单(创建订单) 
  24.         Order order = new Order(); 
  25.         order.setUid(1); 
  26.         order.setUsername("测试用户"); 
  27.         order.setPid(pid); 
  28.         order.setPname(product.getPname()); 
  29.         order.setPprice(product.getPprice()); 
  30.         order.setNumber(1); 
  31.         orderDao.save(order); 
  32.         log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order)); 
  33.  
  34.         //3 扣库存m 
  35.         productService.reduceInventoryCommit(pid, order.getNumber()); 
  36.  
  37.         return order; 
  38.     } 
  39.  
  40.     //用户下单,模拟全局事务回滚 
  41.     @GlobalTransactional//全局事务控制 
  42.     public Order createOrderRollback(Integer pid) { 
  43.         log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid); 
  44.  
  45.         //1 调用商品微服务,查询商品信息 
  46.         Product product = productService.findByPid(pid); 
  47.         log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product)); 
  48.  
  49.         //2 下单(创建订单) 
  50.         Order order = new Order(); 
  51.         order.setUid(1); 
  52.         order.setUsername("测试用户"); 
  53.         order.setPid(pid); 
  54.         order.setPname(product.getPname()); 
  55.         order.setPprice(product.getPprice()); 
  56.         order.setNumber(1); 
  57.         orderDao.save(order); 
  58.         log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order)); 
  59.  
  60.         //3 扣库存m 
  61.         productService.reduceInventoryRollback(pid, order.getNumber()); 
  62.  
  63.         return order; 
  64.     } 
  65.  
  • 商品服务的Feign类ProductService
  
 
 
 
  1. /** 
  2.  * @desc: 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:43 
  5.  */ 
  6. @FeignClient(value = "product-service",configuration = FeignRequestInterceptor.class) 
  7. public interface ProductService { 
  8.     //@FeignClient的value +  @RequestMapping的value值  其实就是完成的请求地址  "http://product-service/product/" + pid 
  9.     //指定请求的URI部分 
  10.     @RequestMapping("/product/product/{pid}") 
  11.     Product findByPid(@PathVariable Integer pid); 
  12.  
  13.     //扣减库存,模拟全局事务提交 
  14.     //参数一: 商品标识 
  15.     //参数二:扣减数量 
  16.     @RequestMapping("/product/reduceInventory/commit") 
  17.     void reduceInventoryCommit(@RequestParam("pid") Integer pid, 
  18.                                @RequestParam("number") Integer number); 
  19.  
  20.     //扣减库存,模拟全局事务回滚 
  21.     //参数一: 商品标识 
  22.     //参数二:扣减数量 
  23.     @RequestMapping("/product/reduceInventory/rollback") 
  24.     void reduceInventoryRollback(@RequestParam("pid") Integer pid, 
  25.                          @RequestParam("number") Integer number); 
  26.  

2. 商品服务

  • ProductController
  
 
 
 
  1. /** 
  2.  * @desc: 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:16 
  5.  */ 
  6. @RestController 
  7. @Slf4j 
  8. @RequestMapping("/product") 
  9. public class ProductController { 
  10.  
  11.     @Autowired 
  12.     private ProductService productService; 
  13.  
  14.     /** 
  15.      * 扣减库存,正常->模拟全局事务提交 
  16.      * @param pid 
  17.      * @param number 
  18.      */ 
  19.     @RequestMapping("/reduceInventory/commit") 
  20.     public void reduceInventoryCommit(Integer pid, Integer number) { 
  21.         String token = ServletUtils.getRequest().getHeader("token"); 
  22.         log.info("从head请求头透传过来的值为token:"+ token); 
  23.         productService.reduceInventoryCommit(pid, number); 
  24.     } 
  25.  
  26.     /** 
  27.      * 扣减库存,异常->模拟全局事务回滚 
  28.      * @param pid 
  29.      * @param number 
  30.      */ 
  31.     @RequestMapping("/reduceInventory/rollback") 
  32.     public void reduceInventoryRollback(Integer pid, Integer number) { 
  33.         productService.reduceInventoryRollback(pid, number); 
  34.     } 
  35.  
  36.     //商品信息查询 
  37.     @RequestMapping("/product/{pid}") 
  38.     public Product product(@PathVariable("pid") Integer pid) { 
  39.         log.info("接下来要进行{}号商品信息的查询", pid); 
  40.         Product product = productService.findByPid(pid); 
  41.         log.info("商品信息查询成功,内容为{}", JSON.toJSONString(product)); 
  42.         return product; 
  43.     } 
  • ProductService接口类
  
 
 
 
  1. /** 
  2.  * @desc:  商品接口 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:18 
  5.  */ 
  6. public interface ProductService { 
  7.     //根据pid查询商品信息 
  8.     Product findByPid(Integer pid); 
  9.  
  10.     //扣减库存,正常->模拟全局事务提交 
  11.     void reduceInventoryCommit(Integer pid, Integer number); 
  12.  
  13.     //扣减库存,异常->模拟全局事务回滚 
  14.     void reduceInventoryRollback(Integer pid, Integer number); 
  • ProductServiceImpl 接口实现类
  
 
 
 
  1. /** 
  2.  * @desc:  商品服务实现类 
  3.  * @author: cao_wencao 
  4.  * @date: 2020-09-22 23:20 
  5.  */ 
  6. @Service 
  7. public class ProductServiceImpl implements ProductService { 
  8.  
  9.     @Autowired 
  10.     private ProductDao productDao; 
  11.  
  12.     @Override 
  13.     public Product findByPid(Integer pid) { 
  14.         return productDao.findById(pid).get(); 
  15.     } 
  16.  
  17.     /** 
  18.      * 扣减库存,正常->模拟全局事务提交 
  19.      * @param pid 
  20.      * @param number 
  21.      */ 
  22.     @Override 
  23.     public void reduceInventoryCommit(Integer pid, Integer number) { 
  24.         //查询 
  25.         Product product = productDao.findById(pid).get(); 
  26.         //省略校验 
  27.  
  28.         //内存中扣减 
  29.         product.setStock(product.getStock() - number); 
  30.  
  31.         //保存扣减库存 
  32.         productDao.save(product); 
  33.     } 
  34.  
  35.     /** 
  36.      * 扣减库存,异常->模拟全局事务回滚 
  37.      * @param pid 
  38.      * @param number 
  39.      */ 
  40.     @Transactional(rollbackFor = Exception.class)  //服务提供方本地事务注解 
  41.     @Override 
  42.     public void reduceInventoryRollback(Integer pid, Integer number) { 
  43.         //查询 
  44.         Product product = productDao.findById(pid).get(); 
  45.         //省略校验 
  46.  
  47.         //内存中扣减 
  48.         product.setStock(product.getStock() - number); 
  49.  
  50.         //模拟异常 
  51.         int i = 1 / 0; 
  52.  
  53.         //保存扣减库存 
  54.         productDao.save(product); 
  55.     } 

四、参考文档

seata官网:

  • http://seata.io/zh-cn/

Seata常见问题:

  • http://seata.io/zh-cn/docs/overview/faq.html

Seata整合1.2教程:

  • https://www.bilibili.com/video/BV12Q4y1A7Nt

升级1.3教程:

  • https://www.bilibili.com/video/BV1Cf4y1X7vR
  • https: //mp.weixin.qq.com/s/2KSidJ72YsovpJ94P1aK1g

springcloud整合demo:

  • https://gitee.com/itCjb/spring-cloud-alibaba-seata-demo

五、完整源码

  • https://github.com/Thinkingcao/SpringCloudLearning/tree/master/springcloud-seata

名称栏目:SpringCloud整合Seata解决分布式事务(搭建+源码)
本文链接:http://www.shufengxianlan.com/qtweb/news1/527251.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联