
spring sharding JDBC 动态调整数据库连接
spring sharding JDBC 动态调整数据库连接通过重写ShardingSphereDataSource类来实现代码package org.apache.shardingsphere.driver.jdbc.core.datasource;import com.alibaba.druid.pool.DruidDataSource;import lombok.extern.slf4j.S
·
原理: 通过扩展原ShardingSphereDataSource类增加updateContextMetaData方法来实现
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.1.2</version>
</dependency>
spring sharding JDBC 动态调整数据库连接
通过重写ShardingSphereDataSource类来实现
代码
package org.apache.shardingsphere.driver.jdbc.core.datasource;
import com.alibaba.druid.pool.DruidDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractDataSourceAdapter;
import org.apache.shardingsphere.driver.jdbc.context.CachedDatabaseMetaData;
import org.apache.shardingsphere.driver.jdbc.context.JDBCContext;
import org.apache.shardingsphere.driver.state.DriverStateContext;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.checker.RuleConfigurationCheckerFactory;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceProvidedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.scope.GlobalRuleConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.schema.DatabaseRulesBuilder;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderFactory;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.spring.boot.ShardingSphereAutoConfiguration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author kittlen
* @version 1.0
*/
@Slf4j
public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {
private final String databaseName;
private final ContextManager contextManager;
private final JDBCContext jdbcContext;
private ContextManagerBuilderParameter contextManagerBuilderParameter;
private ModeConfiguration modeConfiguration;
private Collection<RuleConfiguration> ruleConfigs;
private Properties properties;
public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig) throws SQLException {
this.databaseName = databaseName;
this.modeConfiguration = modeConfig;
this.properties = new Properties();
this.ruleConfigs = new LinkedList<>();
contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), this.ruleConfigs, this.properties);
jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
}
/**
* {@link ShardingSphereAutoConfiguration#shardingSphereDataSource(org.springframework.beans.factory.ObjectProvider, org.springframework.beans.factory.ObjectProvider)}
*
* @param databaseName
* @param modeConfig
* @param dataSourceMap
* @param ruleConfigs
* @param props
* @throws SQLException
*/
public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {
checkRuleConfiguration(databaseName, ruleConfigs);
this.modeConfiguration = modeConfig;
this.databaseName = databaseName;
contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);
this.ruleConfigs = ruleConfigs;
this.properties = props;
jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
}
/**
* 更新现有的数据库配置
*
* @param addDataSourcePropertiesMap 添加的配置
* @param dropDataSourceNames 移除的配置
* @param updateSources 修改的配置
* @throws Exception todo
*/
public void updateContextMetaData(final Map<String, DataSourceProperties> addDataSourcePropertiesMap, final Collection<String> dropDataSourceNames, Map<String, DataSourceProperties> updateSources, String newDataNodes) throws Exception {
if (CollectionUtils.isEmpty(addDataSourcePropertiesMap) && CollectionUtils.isEmpty(dropDataSourceNames) && CollectionUtils.isEmpty(updateSources)) {
if (!StringUtils.isEmpty(newDataNodes)) {
this.rebuildShardingSphereRule(newDataNodes);
}
return;
}
Map<String, DataSource> oldDataSources = new HashMap<>(this.contextManager.getDataSourceMap(databaseName));
if (!CollectionUtils.isEmpty(addDataSourcePropertiesMap)) {
log.info("添加新的db:{}", JsonMapper.defaultMapper().toJson(addDataSourcePropertiesMap.keySet()));
this.contextManager.addResource(this.databaseName, addDataSourcePropertiesMap);
}
if (!CollectionUtils.isEmpty(dropDataSourceNames)) {
oldDateSourceDrop(dropDataSourceNames, oldDataSources);
}
if (!CollectionUtils.isEmpty(updateSources)) {
log.info("更新db:{}", JsonMapper.defaultMapper().toJson(updateSources.keySet()));
oldDateSourceDrop(dropDataSourceNames, oldDataSources);
this.contextManager.addResource(this.databaseName, updateSources);
}
log.info("重新加载shardingRule");
this.rebuildShardingSphereRule(newDataNodes);
log.info("重新构建managerBuildParameter");
Map<String, DataSource> dataSourceMap = this.contextManager.getDataSourceMap(databaseName);
this.contextManagerBuilderParameter = this.builderParameter(this.databaseName, this.modeConfiguration, dataSourceMap, this.ruleConfigs, this.properties);
}
/**
* 删除旧的连接配置
*
* @param dropDataSourceNames
* @param oldDataSources
*/
private void oldDateSourceDrop(Collection<String> dropDataSourceNames, Map<String, DataSource> oldDataSources) {
log.info("移除旧的db:{}", JsonMapper.defaultMapper().toJson(dropDataSourceNames));
this.contextManager.dropResource(this.databaseName, dropDataSourceNames);
dropDataSourceNames.forEach(s -> {
DataSource dataSource = oldDataSources.get(s);
if (dataSource instanceof DruidDataSource) {
try {
log.info("关闭原dataSources:{}", s);
((DruidDataSource) dataSource).close();
} catch (Exception e) {
log.warn("db:{}close时出现异常,异常为:{}", s, e.getMessage(), e);
}
}
});
}
private Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Map<String, DataSource> dataSourceMap) {
Map<String, DataSourceProperties> result = new LinkedHashMap<>(dataSourceMap.size(), 1);
for (Map.Entry<String, DataSource> each : dataSourceMap.entrySet()) {
result.put(each.getKey(), DataSourcePropertiesCreator.create(each.getValue()));
}
return result;
}
@SuppressWarnings("unchecked")
private void checkRuleConfiguration(final String databaseName, final Collection<RuleConfiguration> ruleConfigs) {
ruleConfigs.forEach(each -> RuleConfigurationCheckerFactory.findInstance(each).ifPresent(optional -> optional.check(databaseName, each)));
}
private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {
ContextManagerBuilderParameter parameter = this.builderParameter(databaseName, modeConfig, dataSourceMap, ruleConfigs, props);
this.contextManagerBuilderParameter = parameter;
return ContextManagerBuilderFactory.getInstance(modeConfig).build(parameter);
}
private ContextManagerBuilderParameter builderParameter(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
final Collection<RuleConfiguration> ruleConfigs, final Properties props) {
ContextManagerBuilderParameter parameter = ContextManagerBuilderParameter.builder()
.modeConfig(modeConfig)
.databaseConfigs(Collections.singletonMap(databaseName, new DataSourceProvidedDatabaseConfiguration(dataSourceMap, ruleConfigs)))
.globalRuleConfigs(ruleConfigs.stream().filter(each -> each instanceof GlobalRuleConfiguration).collect(Collectors.toList()))
.props(props)
.instanceDefinition(new InstanceDefinition(InstanceType.JDBC)).build();
return parameter;
}
private Optional<CachedDatabaseMetaData> createCachedDatabaseMetaData(final Map<String, DataSource> dataSources) throws SQLException {
if (dataSources.isEmpty()) {
return Optional.empty();
}
try (Connection connection = dataSources.values().iterator().next().getConnection()) {
return Optional.of(new CachedDatabaseMetaData(connection.getMetaData()));
}
}
@Override
public Connection getConnection() throws SQLException {
return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);
}
@Override
public Connection getConnection(final String username, final String password) throws SQLException {
return getConnection();
}
/**
* Close data sources.
*
* @param dataSourceNames data source names to be closed
* @throws Exception exception
*/
public void close(final Collection<String> dataSourceNames) throws Exception {
Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
for (String each : dataSourceNames) {
close(dataSourceMap.get(each));
}
contextManager.close();
}
private void close(final DataSource dataSource) throws Exception {
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
}
@Override
public void close() throws Exception {
close(contextManager.getDataSourceMap(databaseName).keySet());
}
@Override
public int getLoginTimeout() throws SQLException {
Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
return dataSourceMap.isEmpty() ? 0 : dataSourceMap.values().iterator().next().getLoginTimeout();
}
@Override
public void setLoginTimeout(final int seconds) throws SQLException {
for (DataSource each : contextManager.getDataSourceMap(databaseName).values()) {
each.setLoginTimeout(seconds);
}
}
public Set<String> getDataBaseNames() {
return this.contextManager.getDataSourceMap(databaseName).keySet();
}
private void rebuildShardingSphereRule(String newDataNodes) {
if (StringUtils.isEmpty(newDataNodes)) {
return;
}
log.info("重新加载db映射:{}", newDataNodes);
MetaDataPersistService metaDataPersistService = this.contextManager.getMetaDataContexts().getPersistService().get();
Map<String, DatabaseConfiguration> databaseConfigurationMap = getDatabaseConfigMap(Stream.of(this.databaseName).collect(Collectors.toList()), metaDataPersistService, this.contextManagerBuilderParameter);
databaseConfigurationMap.get(this.databaseName).getRuleConfigurations().forEach(rule -> {
if (rule instanceof ShardingRuleConfiguration) {
Collection<ShardingTableRuleConfiguration> tables = ((ShardingRuleConfiguration) rule).getTables();
List<ShardingTableRuleConfiguration> newShardingTableRuleConfigurations = tables.stream().map(table -> {
String newActualDataNodes = newDataNodes + "." + table.getLogicTable();
ShardingTableRuleConfiguration shardingTableRuleConfiguration = new ShardingTableRuleConfiguration(table.getLogicTable(), newActualDataNodes);
shardingTableRuleConfiguration.setDatabaseShardingStrategy(table.getDatabaseShardingStrategy());
shardingTableRuleConfiguration.setTableShardingStrategy(table.getTableShardingStrategy());
shardingTableRuleConfiguration.setReplaceTablePrefix(table.getReplaceTablePrefix());
shardingTableRuleConfiguration.setKeyGenerateStrategy(table.getKeyGenerateStrategy());
return shardingTableRuleConfiguration;
}).collect(Collectors.toList());
((ShardingRuleConfiguration) rule).setTables(newShardingTableRuleConfigurations);
}
});
ConfigurationProperties props = new ConfigurationProperties(metaDataPersistService.getPropsService().load());
Collection<ShardingSphereRule> build = DatabaseRulesBuilder.build(databaseName, databaseConfigurationMap.get(this.databaseName), props);
List<ShardingSphereRule> oldRules = (List<ShardingSphereRule>) this.contextManager.getMetaDataContexts().getMetaData().getDatabases().get(databaseName).getRuleMetaData().getRules();
oldRules.clear();
oldRules.addAll(build);
}
private Map<String, DatabaseConfiguration> getDatabaseConfigMap(final Collection<String> databaseNames, final MetaDataPersistService metaDataPersistService,
final ContextManagerBuilderParameter parameter) {
Map<String, DatabaseConfiguration> result = new HashMap<>(databaseNames.size(), 1);
databaseNames.forEach(each -> result.put(each, createDatabaseConfiguration(each, metaDataPersistService, parameter)));
return result;
}
private DatabaseConfiguration createDatabaseConfiguration(final String databaseName, final MetaDataPersistService metaDataPersistService,
final ContextManagerBuilderParameter parameter) {
Map<String, DataSource> dataSources = this.contextManager.getDataSourceMap(databaseName);
Collection<RuleConfiguration> databaseRuleConfigs = metaDataPersistService.getDatabaseRulePersistService().load(databaseName);
return new DataSourceProvidedDatabaseConfiguration(dataSources, databaseRuleConfigs);
}
}
通过调用重写的DataSource的updateContextMetaData方法来重新加载连接配置
* @param addDataSourcePropertiesMap 添加的配置
* @param dropDataSourceNames 移除的配置
* @param updateSources 修改的配置
* @param newDataNodes 新的节点使用配置
spring:
shardingsphere:
datasource:
names: ${SHARDING_DATA_SOURCE_NAMES:db-0,db-1}
db-0:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://${mysql.message0.host}/${mysql.message0.database}?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false
username: ${mysql.message0.username}
password: ${mysql.message0.password}
initialSize: 5 # 初始化大小
minIdle: 5 # 最小
maxActive: 20 # 最大
maxWait: 60000 # 获取连接等待超时的时间
...
db-1:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
...
参数说明
参数名 | 说明 |
---|---|
addDataSourcePropertiesMap | 添加的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties |
dropDataSourceNames | 移除的db连接,key->连接id,例如配置中的db-0,db-1 |
updateSources | 修改的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties |
newDataNodes | 新的节点使用配置,将配置项中使用的的spring.shardingsphere.datasource.names值更换为该值 |
DataSourceProperties构建类
getDataSource(),传入的值为普通的db连接配置,例如
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://db3Ip/db3DB?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false
username: db3User
password: db3Pwd
initialSize: 5 # 初始化大小
minIdle: 5 # 最小
maxActive: 20 # 最大
maxWait: 60000 # 获取连接等待超时的时间
timeBetweenEvictionRunsMillis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒(3600000:为1小时)
minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最小生存的时间,单位是毫秒
validationQuery: select current_timestamp() #SELECT 1 FROM DUAL #用来检测连接是否有效的sql,要求是一个查询语句。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会其作用。
testWhileIdle: true #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。建议配置为true,不影响性能,并且保证安全性。
testOnBorrow: false #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:true
testOnReturn: false #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:false
poolPreparedStatements: true #打开PSCache,并且指定每个连接上PSCache的大小
#是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql5.5以下的版本中没有PSCache功能,建议关闭掉。5.5及以上版本有PSCache,建议开启。缺省值:false
maxPoolPreparedStatementPerConnectionSize: 20 # 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100。
package com.kittlen.provider.config.sharding;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.expr.InlineExpressionParser;
import org.apache.shardingsphere.spring.boot.datasource.AopProxyUtils;
import org.apache.shardingsphere.spring.boot.util.PropertyUtil;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.stereotype.Component;
import javax.naming.NamingException;
import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* @author kittlen
* @version 1.0
*/
public class MyShardingDataSource {
private static final String PREFIX = "spring.shardingsphere.datasource.";
private static final String DATA_SOURCE_NAME = "name";
private static final String DATA_SOURCE_NAMES = "names";
private static final String DATA_SOURCE_TYPE = "type";
private static final String JNDI_NAME = "jndi-name";
/**
* Get data source map.
*
* @param environment spring boot environment
* @return data source map
*/
public static Map<String, DataSource> getDataSourceMap(final Environment environment) {
Map<String, DataSource> result = new LinkedHashMap<>();
for (String each : getDataSourceNames(environment)) {
try {
result.put(each, getDataSource(environment, each));
} catch (final NamingException ex) {
throw new ShardingSphereException("Can't find JNDI data source.", ex);
}
}
return result;
}
/**
* Get data source map.
*
* @param environment spring boot environment
* @return data source map
*/
public static Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Environment environment) {
Map<String, DataSourceProperties> result = new LinkedHashMap<>();
for (String each : getDataSourceNames(environment)) {
try {
result.put(each, getDataSourceProperties(environment, each));
} catch (final NamingException ex) {
throw new ShardingSphereException("Can't find JNDI data source.", ex);
}
}
return result;
}
private static List<String> getDataSourceNames(final Environment environment) {
StandardEnvironment standardEnv = (StandardEnvironment) environment;
standardEnv.setIgnoreUnresolvableNestedPlaceholders(true);
String dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAME);
if (Strings.isNullOrEmpty(dataSourceNames)) {
dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAMES);
}
return new InlineExpressionParser(dataSourceNames).splitAndEvaluate();
}
private static DataSource getDataSource(final Environment environment, final String dataSourceName) throws NamingException {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);
Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);
if (dataSourceProps.containsKey(JNDI_NAME)) {
return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());
}
return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));
}
private static DataSourceProperties getDataSourceProperties(final Environment environment, final String dataSourceName) throws NamingException {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);
Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);
if (dataSourceProps.containsKey(JNDI_NAME)) {
return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));
}
return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));
}
public static DataSourceProperties getDataSourceProperties(Map<String, Object> dataSourceProps) throws NamingException {
if (dataSourceProps.containsKey(JNDI_NAME)) {
return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));
}
return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));
}
public static DataSource getDataSource(Map<String, Object> dataSourceProps) throws NamingException {
if (dataSourceProps.containsKey(JNDI_NAME)) {
return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());
}
return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));
}
private static DataSource getJNDIDataSource(final String jndiName) throws NamingException {
JndiObjectFactoryBean bean = new JndiObjectFactoryBean();
bean.setResourceRef(true);
bean.setJndiName(jndiName);
bean.setProxyInterface(DataSource.class);
bean.afterPropertiesSet();
return (DataSource) AopProxyUtils.getTarget(bean.getObject());
}
}
变动连接类 ShardingDataSourceProperties .setDataSource()’
通过调用setDataSource方法来更新db连接
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.StopException;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.ListUtils;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author kittlen
* @version 1.0
* @date 2023/04/03 15:15
*/
@Slf4j
@Getter
@Component
@ApiModel("动态shardingJDBC添加数据源")
@ConditionalOnProperty(value = {"spring.shardingsphere.datasource.dynamicDataBase.useRegistryConfig", "spring.shardingsphere.enabled"}, havingValue = "true")
@ConfigurationProperties(ShardingDataSourceProperties.PROPERTIES)
public class ShardingDataSourceProperties implements EnvironmentAware {
public static final String PROPERTIES = "sharding.dynamicdatasource";
/**
* 获取其他配置的key
* 如果MyDataSource的sourceName为该值,这代表这是其他配置项
*/
private static final String CONFIG_OTHER_CONFIG_KEY = "otherConfig";
/**
* 默认配置
*/
private static final String CONFIG_DEF_CONFIG_KEY = "defConfig";
/**
* 其他配置项中获取新db节点的key
*/
private static final String OTHER_CONFIG_ACTUAL_DATA_NODES_KEY = "actualDataNodes";
/**
* 当前实例的tag标记(自定义配置)
*/
@ApiModelProperty("当前实例的tag标记")
@Value("${kittlen.service.thisTag:testservice-1}")
private String thisTag;
/**
* 本地的配置
*/
@ApiModelProperty("本地的配置")
private Map<String, DataSourceProperties> localProperties;
@Resource
ShardingSphereDataSource shardingSphereDataSource;
/**
* 记录异常数据库连接配置类
*/
@ApiModelProperty("记录异常数据库连接配置类")
private List<DruidDataSource> errorDataSources = new ArrayList<>();
/**
* 连接配置的名称
*/
@ApiModelProperty("连接配置的名称")
private Set<String> dataSourceNames = new HashSet<>();
/**
* 连接配置
* byYml
* key-
*
* @ApiModelProperty("使用的对象") private String useObjectTag;
*/
@ApiModelProperty("连接配置")
private Map<String, List<MyDataSource>> dataSource = new LinkedHashMap<>();
@ApiModelProperty("当前配置的版本号信息")
private Map<String, MyDataSource> oldDataSources = new LinkedHashMap<>();
/**
* 最后获取到的nodes配置信息
*/
@Value("${mysql.actual-data-nodes}")
private String lastActualDataNodes;
private void setLastActualDataNodes(String lastActualDataNodes) {
this.lastActualDataNodes = lastActualDataNodes;
}
public void setDataSource(Map<String, List<MyDataSource>> dataSource) throws Exception {
log.info("加载动态dataSources");
//修改配置时,如果存在错误连接的配置,则断开连接
if (!CollectionUtils.isEmpty(errorDataSources)) {
for (DruidDataSource errorDataSource : errorDataSources) {
errorDataSource.connectionThreadStop();
}
}
Set<String> localNames = localProperties.keySet();
//删除的db,获取之前拥有的全部,剔除现在拥有的,就是剔除的
Set<String> dropNames = new HashSet<>(dataSourceNames);
//添加的db
Map<String, DataSourceProperties> newSources = new HashMap<>();
//修改的db
Map<String, DataSourceProperties> updateSources = new HashMap<>();
String nowDataNodes = null;
List<MyDataSource> thisDataSources = dataSource.get(thisTag);
if (thisDataSources == null) {
log.info("该实例:{}没有配置数据源,尝试获取所有实例配置", thisTag);
thisDataSources = ListUtils.EMPTY_LIST;
}
List<MyDataSource> defConfig = dataSource.get(CONFIG_DEF_CONFIG_KEY);
Map<String, MyDataSource> collect = thisDataSources.stream().collect(Collectors.toMap(MyDataSource::getSourceName, a -> a, (a, b) -> {
log.warn("实例:{}配置的sourceName:{}重复", thisTag, a.getSourceName());
return b;
}));
for (MyDataSource myDataSource : defConfig) {
collect.putIfAbsent(myDataSource.getSourceName(), myDataSource);
}
for (MyDataSource myDataSource : collect.values()) {
//获取其他配置项信息
if (CONFIG_OTHER_CONFIG_KEY.equals(myDataSource.getSourceName())) {
Object o = myDataSource.getSourceConfig().get(OTHER_CONFIG_ACTUAL_DATA_NODES_KEY);
if (o != null) {
nowDataNodes = o.toString();
}
continue;
}
String sourceName = myDataSource.getSourceName();
if (StringUtil.isEmpty(sourceName)) {
throw new MyException("sourceName is empty");
}
if (newSources.containsKey(sourceName)) {
throw new MyException("sourceName 重复");
}
//现在还存在该配置,则代表配置未被删除
dropNames.remove(myDataSource.getSourceName());
MyDataSource oldDataSource = oldDataSources.get(myDataSource.getSourceName());
//如果版本号不存在,则代表这个配置是新的
if (oldDataSource == null) {
//如果本地已经存在该配置,则代表这个配置是修改本地配置的,执行修改操作
if (localNames.contains(sourceName)) {
updateSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
} else {
//否则添加该配置
newSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
}
} else {
if (!oldDataSource.equals(myDataSource)) {
//如果版本不一致,则代表是修改的配置,版本一直,则不变动
updateSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
}
}
}
if (!newSources.isEmpty() || !updateSources.isEmpty() || !dropNames.isEmpty()) {
//使用的是putAll的方式,会覆盖原有的配置
//先校验配置再覆盖配置,所以当配置连接不正确时,不会对原有配置产生影响
//当配置的数据库连接不上时,会一直在尝试重连,默认就算再次修改为正确的配置也没法停止重连
try {
shardingSphereDataSource.updateContextMetaData(newSources, dropNames, updateSources, nowDataNodes);
this.lastActualDataNodes = nowDataNodes;
} catch (Exception e) {
log.error("配置的数据库连接出现异常,异常为:{}", e.getMessage(), e);
if (e instanceof IllegalArgumentException) {
Throwable cause = e.getCause();
if (cause instanceof StopException) {
DruidDataSource errDataSource = ((StopException) cause).getErrDataSource();
//记录错误配置连接的对象
errorDataSources.add(errDataSource);
}
}
if (e instanceof StopException) {
DruidDataSource errDataSource = ((StopException) e).getErrDataSource();
//记录错误配置连接的对象
errorDataSources.add(errDataSource);
}
}
} else {
if (!Objects.equals(nowDataNodes, lastActualDataNodes)) {
shardingSphereDataSource.updateContextMetaData(null, null, null, nowDataNodes);
}
this.lastActualDataNodes = nowDataNodes;
}
this.dataSource = dataSource;
this.dataSourceNames.clear();
this.oldDataSources.clear();
for (MyDataSource myDataSource : collect.values()) {
if (!CONFIG_OTHER_CONFIG_KEY.equals(myDataSource.getSourceName())) {
dataSourceNames.add(myDataSource.getSourceName());
this.oldDataSources.put(myDataSource.getSourceName(), myDataSource);
}
}
log.info("加载动态dataSources完毕");
}
@Override
public void setEnvironment(Environment environment) {
localProperties = MyShardingDataSource.getDataSourcePropertiesMap(environment);
}
@Getter
@Setter
@ApiModel("连接配置")
@EqualsAndHashCode
public static class MyDataSource {
@ApiModelProperty("配置名")
private String sourceName;
@ApiModelProperty("配置信息,key:配置名,value:配置值")
private Map<String, Object> sourceConfig;
}
}
动态db的yam文件内容
sharding:
dynamicDataSource:
dataSource:
defConfig: #默认配置,如果有实例对应的otherConfig配置,则以实例的为主,没有的使用该配置
- sourceName: otherConfig
sourceConfig:
actualDataNodes: db-$->{0..2}
- sourceName: db-2 #必填项
sourceConfig:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://******
username: ***
password: ***
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select current_timestamp()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
testservice-1: #实例id,用于多实例部署使用
- sourceName: otherConfig
sourceConfig:
actualDataNodes: db-$->{0..3}
- sourceName: db-2 #必填项
sourceConfig:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://******
username: ***
password: ***
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select current_timestamp()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
- sourceName: db-3 #必填项
sourceConfig:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://******
username: ***
password: ***
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select current_timestamp()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
更多推荐
所有评论(0)