达梦数据库DM8支持Seata事务框架
1. 用例说明用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:仓储服务:对给定的商品扣除仓储数量。订单服务:根据采购需求创建订单。帐户服务:从用户帐户中扣除余额。1.1 项目的架构图1.2 初始项目搭建1. 环境介绍每个模块一个库,也就是需要4个库。模块名称项目端口号数据库端口号business-xa80845237storage-xa80815238order-xa80825239ac
1. 用例说明
用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
- 仓储服务:对给定的商品扣除仓储数量。
- 订单服务:根据采购需求创建订单。
- 帐户服务:从用户帐户中扣除余额。
1.1 项目的架构图
1.2 初始项目搭建
1. 环境介绍
每个模块一个库,也就是需要4个库。
模块名称 | 项目端口号 | 数据库端口号 |
---|---|---|
business-xa | 8084 | 5237 |
storage-xa | 8081 | 5238 |
order-xa | 8082 | 5239 |
account-xa | 8083 | 5240 |
2. 初始表数据
-- 1. business-xa模块所在数据库 新建BIZ_LOG表
CREATE TABLE "SYSDBA"."BIZ_LOG"
(
"ID_" VARCHAR(64) NOT NULL,
"OP_DATETIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID_")) STORAGE(ON "MAIN", CLUSTERBTR) ;
-- 2. storage-xa模块所在数据库 新建STORAGE_TBL表并新增一条数据
CREATE TABLE "SYSDBA"."STORAGE_TBL"
(
"ID" INT NOT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "STORAGE_TBL_COMMODITY_CODE" UNIQUE("COMMODITY_CODE")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."STORAGE_TBL"("ID", "COMMODITY_CODE", "COUNT") VALUES(1,'C100000','10000');
-- 3. order-xa模块所在数据库 新建ORDER_TBL表
CREATE TABLE "SYSDBA"."ORDER_TBL"
(
"ID" BIGINT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"COMMODITY_CODE" VARCHAR(255) DEFAULT NULL,
"COUNT" INT DEFAULT '0',
"MONEY" INT DEFAULT '0',
"CREATE_TIME" DATETIME(6) DEFAULT CURRENT_TIMESTAMP,
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
-- 4. account-xa模块所在数据库 新建ACCOUNT_TBL表并新增一条数据
CREATE TABLE "SYSDBA"."ACCOUNT_TBL"
(
"ID" INT NOT NULL,
"USER_ID" VARCHAR(255) DEFAULT NULL,
"MONEY" INT DEFAULT '0',
CLUSTER PRIMARY KEY("ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
insert into "SYSDBA"."ACCOUNT_TBL"("ID", "USER_ID", "MONEY") VALUES(1, 'U100000', 600);
3. 搭建项目
- 启动
seata-xa-original
项目 - 配置项目中每个模块连接数据库的连接在
application.properties
文件中
#DM
spring.datasource.url=jdbc:dm://127.0.0.1:5238/
spring.datasource.driver-class-name=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA
4. 存在的问题
每个模块在不同的库,没法保证事务的一致性。所以打算采用Seata分布式事务框架
资料包中seata-xa-original.zip为这块的代码包
2.DM数据库支持Seata事务
2.1 流程分析
因Seata事务框架的AT模式还不支持Dm数据库但支持Oracle数据库,所以整合过程中需修改数据库添加对Oracle的支持。
2.2 修改DM数据库的配置
1.更新jdbc驱动
项目中使用dm的jdbc版本看图。不建议使用的jdbc版本比这个版本低。
2.修改dm.svc.conf配置文件
## 添加下面这两个属性 第一个是兼容mysql 第二个事屏蔽关键字
COMPATIBLE_MODE=(oracle)
KEY_WORDS=(context)
3.修改dm.ini文件
所有数据库实例修都需要改
COMPATIBLE_MODE = 2 #Server compatible mode, 0:none, 1:SQL92, 2:Oracle, 3:MS SQL Server, 4:MySQL, 5:DM6, 6:Teradata
以上配置修改完以后,需要重启数据库
2.3 搭建TC端
介绍了 seata 事务的三个模块:TC(事务协调器)、TM(事务管理器)和RM(资源管理器),其中 TM 和 RM 是嵌⼊在业务应⽤中的,⽽ TC 则是⼀个独⽴服务。
下载Server端
最新版本下载地址:https://github.com/seata/seata/releases
- 官网下载:1.3.0版本的下载地址:https://github.com/seata/seata/releases/tag/v1.3.0
- 在资料包种已经下载好了
seata-server-1.3.0.zip
解压即可。
配置Server端
Server端存储模式(store.mode)现有file、db、redis三种(后续将引入raft,mongodb),file模式无需改动,直接启动即可,
-
file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件bin目录下的root.data,性能较高;
-
db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;
-
redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置.
File模式直连配置
主要关注conf文件夹下的registry.conf
文件以及file.conf
文件。
采用File直连模式registry.conf文件无需改动,需要在file.conf中添加事务分组。
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
file {
name = "file.conf"
}
}
file.conf
在server属性中新增这段值vgroup_mapping.seata-xa=“default”。即添加事务分组。需与TM/RM端配置的一致。
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
## transaction log store, only used in server side
store {
## store mode: file、db
mode = "file"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
}
## server configuration, only used in server side
server {
recovery {
#schedule committing retry period in milliseconds
committingRetryPeriod = 1000
#schedule asyn committing retry period in milliseconds
asynCommittingRetryPeriod = 1000
#schedule rollbacking retry period in milliseconds
rollbackingRetryPeriod = 1000
#schedule timeout retry period in milliseconds
timeoutRetryPeriod = 1000
}
undo {
logSaveDays = 7
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
#check auth
enableCheckAuth = true
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
maxCommitRetryTimeout = "-1"
maxRollbackRetryTimeout = "-1"
rollbackRetryTimeoutUnlockEnable = false
# 新增的这段,设置事务分组
vgroup_mapping.seata-xa="default"
}
## metrics configuration, only used in server side
metrics {
enabled = false
registryType = "compact"
# multi exporters use comma divided
exporterList = "prometheus"
exporterPrometheusPort = 9898
}
2.4 TM/RM端整合Seata
Seata事务框架在AT 模式下在RM端需要 UNDO_LOG 表
,来记录每个RM的事务信息,主要包含数据修改前,后的相关信息,⽤于回滚处理,所以在所有数据库中分别执⾏.
CREATE TABLE "SYSDBA"."UNDO_LOG"
(
"ID" BIGINT NOT NULL,
"BRANCH_ID" BIGINT NOT NULL,
"XID" VARCHAR(100) NOT NULL,
"CONTEXT" VARCHAR(150) NOT NULL,
"ROLLBACK_INFO" BLOB NOT NULL,
"LOG_STATUS" INT NOT NULL,
"LOG_CREATED" DATETIME(6) NOT NULL,
"LOG_MODIFIED" DATETIME(6) NOT NULL,
NOT CLUSTER PRIMARY KEY("ID"),
CONSTRAINT "UX_UNDO_LOG" UNIQUE("XID", "BRANCH_ID")) STORAGE(ON "MAIN", CLUSTERBTR) ;
RM(事务管理器)端整合Seata与TM(事务管理器)端步骤类似,只不过不需要在⽅法添加@GlobalTransactional注解,针对我们⼯程lagou_bussiness是事务的发起者,所以是TM端,其它⼯程为RM端. 所以我们只需要在lagou_common_db完成前4步骤即可
1.引入seata依赖
- 修改父pom.xml文件,锁定seata的版本
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--SCA -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<!--seata版本管理, ⽤于锁定⾼版本的seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
</dependencyManagement>
- 修改每个模块的pom.xml文件引入seata的依赖
因为原来的是旧版本,所以需要引入新的seata的版本依赖
<!--seata依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<!--排除低版本seata依赖-->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--添加⾼版本seata依赖-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
2.引入注册中心文件
在每个模块的resources目录下引入Seata事务Client客户端的registry.conf
文件。又因为注册中心采用的直连模式,所以还需要引入file.conf
。
这两个文件可以参考资料包。可以看图:
3.配置连接事务组
新增每个模块的事务组,在每个项目的application.properties
中添加以下信息
spring.cloud.alibaba.seata.txServiceGroup=seata-xa
logging.level.io.seata=debug
logging.level.io.seata.core.rpc=warn
4.修改数据源url兼容Oracle
每个模块兼容Oracle数据库,所以需要修改数据源的Url连接。
修改每个模块的url连接。下面是一个例子,注意端口号
spring.datasource.url=jdbc:oracle:thin:@localhost:5237
spring.datasource.driverClassName=dm.jdbc.driver.DmDriver
spring.datasource.username=SYSDBA
spring.datasource.password=SYSDBA
5.添加seata代理数据源
seata事务框架的AT模式需要操作数据源,所以我们把数据源对象代理给seata框架。
- 在每个模块Application启动类同目录下新建数据源对象,
- 修改每个模块的Application类的扫描。
下面是修改Storage模块的例子
@Configuration
public class StorageDataSourceConfiguration {
/**
* 使⽤druid连接池
*
* @return
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 设置数据源代理-,完成分⽀事务注册/事务提交与回滚等操作
*
* @param druidDataSource
* @return
*/
@Primary //设置⾸选数据源对象
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
scanBasePackages = "com.dameng")
public class StorageXAApplication {
public static void main(String[] args) {
SpringApplication.run(StorageXAApplication.class, args);
}
}
6.修改seata源码对达梦的兼容
- 在每个模块中配置新建目录
com.dameng.rm.datasource.util
以及XAUtils类。 - 修改每个模块的启动类,让其启动项目时替换掉源代码包中的XAUtils类。
package com.dameng.rm.datasource.util;
import com.alibaba.druid.util.JdbcUtils;
import com.alibaba.druid.util.MySqlUtils;
import com.alibaba.druid.util.PGUtils;
import io.seata.rm.BaseDataSourceResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import java.lang.reflect.Constructor;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
public class XAUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(XAUtils.class);
public static String getDbType(String jdbcUrl, String driverClassName) {
return JdbcUtils.getDbType(jdbcUrl, driverClassName);
}
public static XAConnection createXAConnection(Connection physicalConn, BaseDataSourceResource dataSourceResource) throws SQLException {
return createXAConnection(physicalConn, dataSourceResource.getDriver(), dataSourceResource.getDbType());
}
public static XAConnection createXAConnection(Connection physicalConn, Driver driver, String dbType) throws SQLException {
if (JdbcUtils.ORACLE.equals(dbType)) {
try {
// https://github.com/alibaba/druid/issues/3707
// before Druid issue fixed, just make ORACLE XA connection in my way.
// return OracleUtils.OracleXAConnection(physicalConn);
String physicalConnClassName = physicalConn.getClass().getName();
if ("oracle.jdbc.driver.T4CConnection".equals(physicalConnClassName)) {
return createOracleXAConnection(physicalConn, "oracle.jdbc.driver.T4CXAConnection");
} else {
return createOracleXAConnection(physicalConn, "oracle.jdbc.xa.client.OracleXAConnection");
}
} catch (XAException xae) {
throw new SQLException("create xaConnection error", xae);
}
}
if (JdbcUtils.DM.equals(dbType)) {
try {
// String physicalConnClassName = physicalConn.getClass().getName();
// if ("dm.jdbc.driver.DmdbConnection".equals(physicalConnClassName)) {
// return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbConnection");
// } else {
// return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");
// }
return createDMXAConnection(physicalConn, "dm.jdbc.driver.DmdbXAConnection");
} catch (XAException xae) {
throw new SQLException("create xaConnection error", xae);
}
}
if (JdbcUtils.MYSQL.equals(dbType) || JdbcUtils.MARIADB.equals(dbType)) {
return MySqlUtils.createXAConnection(driver, physicalConn);
}
if (JdbcUtils.POSTGRESQL.equals(dbType)) {
return PGUtils.createXAConnection(physicalConn);
}
throw new SQLException("xa not support dbType: " + dbType);
}
private static XAConnection createOracleXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {
try {
Class xaConnectionClass = Class.forName(xaConnectionClassName);
Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
constructor.setAccessible(true);
return constructor.newInstance(physicalConnection);
} catch (Exception e) {
LOGGER.warn("Failed to create Oracle XA Connection " + xaConnectionClassName + " on " + physicalConnection);
if (e instanceof XAException) {
throw (XAException) e;
} else {
throw new SQLException(e);
}
}
}
private static XAConnection createDMXAConnection(Connection physicalConnection, String xaConnectionClassName) throws XAException, SQLException {
try {
Class xaConnectionClass = Class.forName(xaConnectionClassName);
Constructor<XAConnection> constructor = xaConnectionClass.getConstructor(Connection.class);
constructor.setAccessible(true);
return constructor.newInstance(physicalConnection);
} catch (Exception e) {
LOGGER.warn("Failed to create DM XA Connection " + xaConnectionClassName + " on " + physicalConnection);
if (e instanceof XAException) {
throw (XAException) e;
} else {
throw new SQLException(e);
}
}
}
}
在启动类上使用@ComponentScan
注解,使其项目启动加载时使用我们本机修改的类
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class,
scanBasePackages = "com.dameng")
@ComponentScan(excludeFilters = {
@ComponentScan.Filter(type =
FilterType.ASSIGNABLE_TYPE, classes = {
XAUtils.class})})
public class StorageXAApplication {
public static void main(String[] args) {
SpringApplication.run(StorageXAApplication.class, args);
}
}
7.添加注解@GlobalTransactional
Business为Seata事务的TM,所以在方法上添加@GlobalTransactional
注解
@GlobalTransactional(name = "sale", timeoutMills = 100000, rollbackFor = Exception.class)
public void execWork(String USER_ID, String Storage_Code, Integer orderCount) {
//记录本地事务
int update = jdbcTemplate.update("insert into BIZ_LOG(id_) values(?)", UUID.randomUUID().toString());
//扣减商品库存
String storageResult = storageFeignClient.consumeStorage(Storage_Code, orderCount);
if (FAIL.equals(storageResult)) {
throw new RuntimeException("商品报错回滚...");
}
//扣减订单库存
String orderResult = orderFeignClient.createOrder(USER_ID, Storage_Code, orderCount);
if (FAIL.equals(orderResult)) {
throw new RuntimeException("订单报错回滚...");
}
}
2.5 启动项目
1. 启动 Seata服务(TC端)
进入到seata的bin目录seata\bin
下管理员执行seata-server.bat文件即可。
服务启动后默认端口为8091。
注意:
因为采用直连模式,会在bin目录下生成sessionStore文件,每次启动前建议删除。
有时候客户端会卡住,按一下回车键刷新下日志就好了
2. 启动business-xa服务(TM端)
启动 com.dameng.sample.BusinessXAApplication 服务。
3. 启动storage-xa服务 (RM端)
启动 com.dameng.sample.StorageXAApplication服务。
4. 启动order-xa服务(RM端)
启动 com.dameng.sample.OrderXAApplication服务。
5. 启动account-xa服务(RM端)
启动 com.dameng.sample.AccountXAApplication服务。
项目启动后,查看seata服务端日志,查看服务是否已经注册到Seata服务中
6. 测试
测试成功
在浏览器中输入http://localhost:8084/execWork?orderCount=1,查看库存,订单,金额是否正常。
测试回滚
修改order模块中service代码如图,使代码报异常。
资料包中seata-xa-final.zip为整合以后的包
问题整理
1. endpoint format should like ip:port
java.lang.IllegalArgumentException: endpoint format should like ip:port
at io.seata.discovery.registry.FileRegistryServiceImpl.lookup(FileRegistryServiceImpl.java:95) ~[seata-all-1.3.0.jar:1.3.0]
at io.seata.core.rpc.netty.NettyClientChannelManager.getAvailServerList(NettyClientChannelManager.java:217) ~[seata-all-1.3.0.jar:1.3.0]
at io.seata.core.rpc.netty.NettyClientChannelManager.reconnect(NettyClientChannelManager.java:162) ~[seata-all-1.3.0.jar:1.3.0]
at io.seata.core.rpc.netty.RmNettyRemotingClient.registerResource(RmNettyRemotingClient.java:181) [seata-all-1.3.0.jar:1.3.0]
at io.seata.rm.AbstractResourceManager.registerResource(AbstractResourceManager.java:121) [seata-all-1.3.0.jar:1.3.0]
at io.seata.rm.datasource.DataSourceManager.registerResource(DataSourceManager.java:146) [seata-all-1.3.0.jar:1.3.0]
解决办法
- seata(TC端)事务组与java模块(RM端)事务组不同需自己检查。
- seata(TC端)的file.conf文件的server属性
vgroupMapping
配置名有问题自行检查修改。
2. io.seata.core.exception.TmTransactionException: RPC timeout
解决办法
- 即便已经按照顺序启动,seata也提示注册。因为电脑内存等原因实际情况还是没有注册上。重新启动一遍服务即可。
- seata配置有问题,seata控制台可能都没有注册上。检查seata的配置文件以及该服务的配置是否正确。
3.java.sql.SQLException: not support oracle driver 8.1
1.达梦数据库版本在DM8 1.2.38以下需要更换为2021年8月以后的版本。
2. 数据库在linux,应用系统在window的IDEA中,会出现window项目启动的时候 驱动包识别的是本台机器上的,识别不到数据库服务器上的dm.svc.conf的配置内容,所以window需要放到需要在指定目录下放dmsvc.conf文件。
4.无法解析的成员访问表达式[UNDO_LOG_SEQ.NEXTVAL]
工具包中整理初始化sql的时候,初始化序列的SQL语句忘记整理了。
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
在数据库中在执行这句SQL语句就好了。
资料包
链接:https://pan.baidu.com/s/1Tspfh_AH_al_qTzOoQgm0Q 提取码:g03w
-- 查询blob字段的值
select utl_raw.cast_to_varchar2(dbms_lob.substr(ROLLBACK_INFO)) from "SYSDBA"."UNDO_LOG";
达梦支持
=======================================
有任何问题请到技术社区反馈。
24小时免费服务热线:400 991 6599
达梦技术社区:https://eco.dameng.com
更多推荐
所有评论(0)