1. 用例说明

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。
  • 订单服务:根据采购需求创建订单。
  • 帐户服务:从用户帐户中扣除余额。

1.1 项目的架构图

请添加图片描述
请添加图片描述

1.2 初始项目搭建

1. 环境介绍

每个模块一个库,也就是需要4个库。

模块名称项目端口号数据库端口号
business-xa80845237
storage-xa80815238
order-xa80825239
account-xa80835240

2. 初始表数据

-- 1. business-xa模块所在数据库  新建BIZ_LOG表
CREATE TABLE "SYSDBA"."BIZ_LOG"
(
"ID_" VARCHAR(64) NOT NULL,
"OP_DATETIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID_")) STORAGE(ON "MAIN", CLUSTERBTR) ;

-- 2. storage-xa模块所在数据库 新建STORAGE_TBL表并新增一条数据
CREATE TABLE "SYSDBA"."STORAGE_TBL"
(
"ID" INT NOT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "STORAGE_TBL_COMMODITY_CODE" UNIQUE("COMMODITY_CODE")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."STORAGE_TBL"("ID", "COMMODITY_CODE", "COUNT")  VALUES(1,'C100000','10000');

-- 3. order-xa模块所在数据库   新建ORDER_TBL表
CREATE TABLE "SYSDBA"."ORDER_TBL"
(
"ID" BIGINT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
"MONEY" INT DEFAULT '0',
"CREATE_TIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;

-- 4. account-xa模块所在数据库 新建ACCOUNT_TBL表并新增一条数据
 CREATE TABLE "SYSDBA"."ACCOUNT_TBL"
(
"ID" INT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"MONEY" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."ACCOUNT_TBL"("ID", "USER_ID", "MONEY") VALUES(1, 'U100000', 600);

3. 搭建项目

  1. 启动seata-xa-original项目
  2. 配置项目中每个模块连接数据库的连接在application.properties文件中
#DM
spring.datasource.url=jdbc:dm://127.0.0.1:5238/
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA

4. 存在的问题

每个模块在不同的库,没法保证事务的一致性。所以打算采用Seata分布式事务框架

资料包中seata-xa-original.zip为这块的代码包

2.DM数据库支持Seata事务

2.1 流程分析

因Seata事务框架的AT模式还不支持Dm数据库但支持Oracle数据库,所以整合过程中需修改数据库添加对Oracle的支持。

请添加图片描述

2.2 修改DM数据库的配置

1.更新jdbc驱动

项目中使用dm的jdbc版本看图。不建议使用的jdbc版本比这个版本低。

请添加图片描述

2.修改dm.svc.conf配置文件

## 添加下面这两个属性  第一个是兼容mysql 第二个事屏蔽关键字
COMPATIBLE_MODE=(oracle)
KEY_WORDS=(context)

请添加图片描述
请添加图片描述

3.修改dm.ini文件

所有数据库实例修都需要改

COMPATIBLE_MODE     = 2   #Server compatible mode, 0:none, 1:SQL92, 2:Oracle, 3:MS SQL Server, 4:MySQL, 5:DM6, 6:Teradata

以上配置修改完以后,需要重启数据库

2.3 搭建TC端

介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,⽽ TC 则是⼀个独⽴服务。

下载Server端

最新版本下载地址:https://github.com/seata/seata/releases

  1. 官网下载:1.3.0版本的下载地址:https://github.com/seata/seata/releases/tag/v1.3.0
  2. 在资料包种已经下载好了seata-server-1.3.0.zip解压即可。
    请添加图片描述

配置Server端

Server端存储模式(store.mode)现有file、db、redis三种(后续将引入raft,mongodb),file模式无需改动,直接启动即可,

  • file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件bin目录下的root.data,性能较高;

  • db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;

  • redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置.

File模式直连配置

主要关注conf文件夹下的registry.conf文件以及file.conf文件。

采用File直连模式registry.conf文件无需改动,需要在file.conf中添加事务分组。

registry.conf
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  file {
    name = "file.conf"
  }
}
file.conf

在server属性中新增这段值vgroup_mapping.seata-xa=“default”。即添加事务分组。需与TM/RM端配置的一致。

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = false
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThreadPrefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}

## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "file"
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

}
## server configuration, only used in server side
server {
  recovery {
    #schedule committing retry period in milliseconds
    committingRetryPeriod = 1000
    #schedule asyn committing retry period in milliseconds
    asynCommittingRetryPeriod = 1000
    #schedule rollbacking retry period in milliseconds
    rollbackingRetryPeriod = 1000
    #schedule timeout retry period in milliseconds
    timeoutRetryPeriod = 1000
  }
  undo {
    logSaveDays = 7
    #schedule delete expired undo_log in milliseconds
    logDeletePeriod = 86400000
  }
  #check auth
  enableCheckAuth = true
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  maxCommitRetryTimeout = "-1"
  maxRollbackRetryTimeout = "-1"
  rollbackRetryTimeoutUnlockEnable = false
  # 新增的这段,设置事务分组
  vgroup_mapping.seata-xa="default"
}

## metrics configuration, only used in server side
metrics {
  enabled = false
  registryType = "compact"
  # multi exporters use comma divided
  exporterList = "prometheus"
  exporterPrometheusPort = 9898
}

2.4 TM/RM端整合Seata

Seata事务框架在AT 模式下在RM端需要 UNDO_LOG 表,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执⾏.

CREATE TABLE "SYSDBA"."UNDO_LOG"
(
"ID" BIGINT NOT NULL,
"BRANCH_ID" BIGINT NOT NULL,
"XID" VARCHAR(100) NOT NULL,
"CONTEXT" VARCHAR(150) NOT NULL,
"ROLLBACK_INFO" BLOB NOT NULL,
"LOG_STATUS" INT NOT NULL,
"LOG_CREATED" DATETIME(6) NOT NULL,
"LOG_MODIFIED" DATETIME(6) NOT NULL,
NOT CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "UX_UNDO_LOG" UNIQUE("XID", "BRANCH_ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;

RM(事务管理器)端整合Seata与TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程lagou_bussiness是事务的发起者,所以是TM端,其它⼯程为RM端. 所以我们只需要在lagou_common_db完成前4步骤即可

1.引入seata依赖

  1. 修改父pom.xml文件,锁定seata的版本
   <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Greenwich.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--SCA -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2.1.0.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid-spring-boot-starter</artifactId>
                <version>1.1.22</version>
            </dependency>
            <!--seata版本管理, ⽤于锁定⾼版本的seata -->
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
                <version>1.3.0</version>
            </dependency>

        </dependencies>
    </dependencyManagement>
  1. 修改每个模块的pom.xml文件引入seata的依赖

因为原来的是旧版本,所以需要引入新的seata的版本依赖

请添加图片描述

        <!--seata依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-seata</artifactId>
            <!--排除低版本seata依赖-->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--添加⾼版本seata依赖-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>1.3.0</version>
        </dependency>

2.引入注册中心文件

每个模块的resources目录下引入Seata事务Client客户端的registry.conf文件。又因为注册中心采用的直连模式,所以还需要引入file.conf

这两个文件可以参考资料包。可以看图:

请添加图片描述

3.配置连接事务组

新增每个模块的事务组,在每个项目的application.properties中添加以下信息

spring.cloud.alibaba.seata.txServiceGroup=seata-xa
logging.level.io.seata=debug
logging.level.io.seata.core.rpc=warn

请添加图片描述

4.修改数据源url兼容Oracle

每个模块兼容Oracle数据库,所以需要修改数据源的Url连接。

修改每个模块的url连接。下面是一个例子,注意端口号

spring.datasource.url=jdbc:oracle:thin:@localhost:5237
spring.datasource.driverClassName=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA

5.添加seata代理数据源

seata事务框架的AT模式需要操作数据源,所以我们把数据源对象代理给seata框架。

  1. 在每个模块Application启动类同目录下新建数据源对象,
  2. 修改每个模块的Application类的扫描。

下面是修改Storage模块的例子

@Configuration
public class StorageDataSourceConfiguration {

    /**
     * 使⽤druid连接池
     *
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    /**
     * 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作
     *
     * @param druidDataSource
     * @return
     */
    @Primary //设置⾸选数据源对象
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }

}
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
        scanBasePackages = "com.dameng")
public class StorageXAApplication {

    public static void main(String[] args) {
        SpringApplication.run(StorageXAApplication.class, args);
    }
}

6.修改seata源码对达梦的兼容

  1. 在每个模块中配置新建目录com.dameng.rm.datasource.util以及XAUtils类。
  2. 修改每个模块的启动类,让其启动项目时替换掉源代码包中的XAUtils类。
package com.dameng.rm.datasource.util;

import com.alibaba.druid.util.JdbcUtils;
import com.alibaba.druid.util.MySqlUtils;
import com.alibaba.druid.util.PGUtils;
import io.seata.rm.BaseDataSourceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;

public class XAUtils {

    private static final Logger LOGGER = LoggerFactory.getLogger(XAUtils.class);

    public static String getDbType(String jdbcUrl, String driverClassName) {
        return JdbcUtils.getDbType(jdbcUrl, driverClassName);
    }

    public static XAConnection createXAConnection(Connection physicalConn, BaseDataSourceResource dataSourceResource) throws SQLException {
        return createXAConnection(physicalConn, dataSourceResource.getDriver(), dataSourceResource.getDbType());
    }

    public static XAConnection createXAConnection(Connection physicalConn, Driver driver, String dbType) throws SQLException {
        if (JdbcUtils.ORACLE.equals(dbType)) {
            try {
                // https://github.com/alibaba/druid/issues/3707
                // before Druid issue fixed, just make ORACLE XA connection in my way.
                // return OracleUtils.OracleXAConnection(physicalConn);
                String physicalConnClassName = physicalConn.getClass().getName();
                if ("oracle.jdbc.driver.T4CConnection".equals(physicalConnClassName)) {
                    return createOracleXAConnection(physicalConn, "oracle.jdbc.driver.T4CXAConnection");
                } else {
                    return createOracleXAConnection(physicalConn, "oracle.jdbc.xa.client.OracleXAConnection");
                }
            } catch (XAException xae) {
                throw new SQLException("create xaConnection error", xae);
            }
        }

        if (JdbcUtils.DM.equals(dbType)) {
            try {
//                String physicalConnClassName = physicalConn.getClass().getName();
//                if ("dm.jdbc.driver.DmdbConnection".equals(physicalConnClassName)) {
//                    return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbConnection");
//                } else {
//                    return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");
//                }
            	return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");
            } catch (XAException xae) {
                throw new SQLException("create xaConnection error", xae);
            }
        }

        if (JdbcUtils.MYSQL.equals(dbType) || JdbcUtils.MARIADB.equals(dbType)) {
            return MySqlUtils.createXAConnection(driver, physicalConn);
        }

        if (JdbcUtils.POSTGRESQL.equals(dbType)) {
            return PGUtils.createXAConnection(physicalConn);
        }

        throw new SQLException("xa not support dbType: " + dbType);
    }

    private static XAConnection createOracleXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {
        try {
            Class xaConnectionClass = Class.forName(xaConnectionClassName);
            Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
            constructor.setAccessible(true);
            return constructor.newInstance(physicalConnection);
        } catch (Exception e) {
            LOGGER.warn("Failed to create Oracle XA Connection " + xaConnectionClassName + " on " + physicalConnection);
            if (e instanceof XAException) {
                throw (XAException) e;
            } else {
                throw new SQLException(e);
            }
        }

    }

    private static XAConnection createDMXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {
        try {
            Class xaConnectionClass = Class.forName(xaConnectionClassName);
            Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
            constructor.setAccessible(true);
            return constructor.newInstance(physicalConnection);
        } catch (Exception e) {
            LOGGER.warn("Failed to create DM XA Connection " + xaConnectionClassName + " on " + physicalConnection);
            if (e instanceof XAException) {
                throw (XAException) e;
            } else {
                throw new SQLException(e);
            }
        }

    }
}

在启动类上使用@ComponentScan注解,使其项目启动加载时使用我们本机修改的类

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
        scanBasePackages = "com.dameng")
@ComponentScan(excludeFilters = {
        @ComponentScan.Filter(type =
                FilterType.ASSIGNABLE_TYPE, classes = {
                XAUtils.class})})
public class StorageXAApplication {

    public static void main(String[] args) {
        SpringApplication.run(StorageXAApplication.class, args);
    }
}

7.添加注解@GlobalTransactional

Business为Seata事务的TM,所以在方法上添加@GlobalTransactional注解

    @GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class)
    public void execWork(String USER_ID, String Storage_Code, Integer orderCount) {
        //记录本地事务
        int update = jdbcTemplate.update("insert into BIZ_LOG(id_) values(?)", UUID.randomUUID().toString());

        //扣减商品库存
        String storageResult = storageFeignClient.consumeStorage(Storage_Code, orderCount);
        if (FAIL.equals(storageResult)) {
            throw new RuntimeException("商品报错回滚...");
        }

        //扣减订单库存
        String orderResult = orderFeignClient.createOrder(USER_ID, Storage_Code, orderCount);
        if (FAIL.equals(orderResult)) {
            throw new RuntimeException("订单报错回滚...");
        }
    }

2.5 启动项目

1. 启动 Seata服务(TC端)

进入到seata的bin目录seata\bin下管理员执行seata-server.bat文件即可。

服务启动后默认端口为8091。

注意:

  1. 因为采用直连模式,会在bin目录下生成sessionStore文件,每次启动前建议删除。

  2. 有时候客户端会卡住,按一下回车键刷新下日志就好了

请添加图片描述

2. 启动business-xa服务(TM端)

启动 com.dameng.sample.BusinessXAApplication 服务。

3. 启动storage-xa服务 (RM端)

启动 com.dameng.sample.StorageXAApplication服务。

4. 启动order-xa服务(RM端)

启动 com.dameng.sample.OrderXAApplication服务。

5. 启动account-xa服务(RM端)

启动 com.dameng.sample.AccountXAApplication服务。

项目启动后,查看seata服务端日志,查看服务是否已经注册到Seata服务中

请添加图片描述

6. 测试

测试成功

在浏览器中输入http://localhost:8084/execWork?orderCount=1,查看库存,订单,金额是否正常。

请添加图片描述

测试回滚

修改order模块中service代码如图,使代码报异常。

请添加图片描述

请添加图片描述

资料包中seata-xa-final.zip为整合以后的包

问题整理

1. endpoint format should like ip:port

java.lang.IllegalArgumentException: endpoint format should like ip:port
	at io.seata.discovery.registry.FileRegistryServiceImpl.lookup(FileRegistryServiceImpl.java:95) ~[seata-all-1.3.0.jar:1.3.0]
	at io.seata.core.rpc.netty.NettyClientChannelManager.getAvailServerList(NettyClientChannelManager.java:217) ~[seata-all-1.3.0.jar:1.3.0]
	at io.seata.core.rpc.netty.NettyClientChannelManager.reconnect(NettyClientChannelManager.java:162) ~[seata-all-1.3.0.jar:1.3.0]
	at io.seata.core.rpc.netty.RmNettyRemotingClient.registerResource(RmNettyRemotingClient.java:181) [seata-all-1.3.0.jar:1.3.0]
	at io.seata.rm.AbstractResourceManager.registerResource(AbstractResourceManager.java:121) [seata-all-1.3.0.jar:1.3.0]
	at io.seata.rm.datasource.DataSourceManager.registerResource(DataSourceManager.java:146) [seata-all-1.3.0.jar:1.3.0]

解决办法

  1. seata(TC端)事务组与java模块(RM端)事务组不同需自己检查。
  2. seata(TC端)的file.conf文件的server属性vgroupMapping配置名有问题自行检查修改。

2. io.seata.core.exception.TmTransactionException: RPC timeout

请添加图片描述

解决办法

  1. 即便已经按照顺序启动,seata也提示注册。因为电脑内存等原因实际情况还是没有注册上。重新启动一遍服务即可。
  2. seata配置有问题,seata控制台可能都没有注册上。检查seata的配置文件以及该服务的配置是否正确。

3.java.sql.SQLException: not support oracle driver 8.1

1.达梦数据库版本在DM8 1.2.38以下需要更换为2021年8月以后的版本。
2. 数据库在linux,应用系统在window的IDEA中,会出现window项目启动的时候 驱动包识别的是本台机器上的,识别不到数据库服务器上的dm.svc.conf的配置内容,所以window需要放到需要在指定目录下放dmsvc.conf文件。

4.无法解析的成员访问表达式[UNDO_LOG_SEQ.NEXTVAL]

工具包中整理初始化sql的时候,初始化序列的SQL语句忘记整理了。
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
在数据库中在执行这句SQL语句就好了。

资料包

链接:https://pan.baidu.com/s/1Tspfh_AH_al_qTzOoQgm0Q 提取码:g03w

请添加图片描述

-- 查询blob字段的值
select utl_raw.cast_to_varchar2(dbms_lob.substr(ROLLBACK_INFO)) from "SYSDBA"."UNDO_LOG";

达梦支持

=======================================

有任何问题请到技术社区反馈。

24小时免费服务热线:400 991 6599

达梦技术社区:https://eco.dameng.com

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐