原理: 通过扩展原ShardingSphereDataSource类增加updateContextMetaData方法来实现

        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
            <version>5.1.2</version>
        </dependency>

spring sharding JDBC 动态调整数据库连接

通过重写ShardingSphereDataSource类来实现

代码

package org.apache.shardingsphere.driver.jdbc.core.datasource;

import com.alibaba.druid.pool.DruidDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractDataSourceAdapter;
import org.apache.shardingsphere.driver.jdbc.context.CachedDatabaseMetaData;
import org.apache.shardingsphere.driver.jdbc.context.JDBCContext;
import org.apache.shardingsphere.driver.state.DriverStateContext;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.checker.RuleConfigurationCheckerFactory;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.config.database.impl.DataSourceProvidedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.scope.GlobalRuleConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.schema.DatabaseRulesBuilder;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderFactory;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
import org.apache.shardingsphere.spring.boot.ShardingSphereAutoConfiguration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author kittlen
 * @version 1.0
 */
@Slf4j
public final class ShardingSphereDataSource extends AbstractDataSourceAdapter implements AutoCloseable {

    private final String databaseName;

    private final ContextManager contextManager;

    private final JDBCContext jdbcContext;

    private ContextManagerBuilderParameter contextManagerBuilderParameter;

    private ModeConfiguration modeConfiguration;

    private Collection<RuleConfiguration> ruleConfigs;

    private Properties properties;

    public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig) throws SQLException {
        this.databaseName = databaseName;
        this.modeConfiguration = modeConfig;
        this.properties = new Properties();
        this.ruleConfigs = new LinkedList<>();
        contextManager = createContextManager(databaseName, modeConfig, new HashMap<>(), this.ruleConfigs, this.properties);
        jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
    }

    /**
     * {@link ShardingSphereAutoConfiguration#shardingSphereDataSource(org.springframework.beans.factory.ObjectProvider, org.springframework.beans.factory.ObjectProvider)}
     *
     * @param databaseName
     * @param modeConfig
     * @param dataSourceMap
     * @param ruleConfigs
     * @param props
     * @throws SQLException
     */
    public ShardingSphereDataSource(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
                                    final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {
        checkRuleConfiguration(databaseName, ruleConfigs);
        this.modeConfiguration = modeConfig;
        this.databaseName = databaseName;
        contextManager = createContextManager(databaseName, modeConfig, dataSourceMap, ruleConfigs, null == props ? new Properties() : props);
        this.ruleConfigs = ruleConfigs;
        this.properties = props;
        jdbcContext = new JDBCContext(contextManager.getDataSourceMap(databaseName));
    }

/**
     * 更新现有的数据库配置
     *
     * @param addDataSourcePropertiesMap 添加的配置
     * @param dropDataSourceNames        移除的配置
     * @param updateSources              修改的配置
     * @throws Exception todo
     */
    public void updateContextMetaData(final Map<String, DataSourceProperties> addDataSourcePropertiesMap, final Collection<String> dropDataSourceNames, Map<String, DataSourceProperties> updateSources, String newDataNodes) throws Exception {
        if (CollectionUtils.isEmpty(addDataSourcePropertiesMap) && CollectionUtils.isEmpty(dropDataSourceNames) && CollectionUtils.isEmpty(updateSources)) {
            if (!StringUtils.isEmpty(newDataNodes)) {
                this.rebuildShardingSphereRule(newDataNodes);
            }
            return;
        }
        Map<String, DataSource> oldDataSources = new HashMap<>(this.contextManager.getDataSourceMap(databaseName));
        if (!CollectionUtils.isEmpty(addDataSourcePropertiesMap)) {
            log.info("添加新的db:{}", JsonMapper.defaultMapper().toJson(addDataSourcePropertiesMap.keySet()));
            this.contextManager.addResource(this.databaseName, addDataSourcePropertiesMap);
        }
        if (!CollectionUtils.isEmpty(dropDataSourceNames)) {
            oldDateSourceDrop(dropDataSourceNames, oldDataSources);
        }
        if (!CollectionUtils.isEmpty(updateSources)) {
            log.info("更新db:{}", JsonMapper.defaultMapper().toJson(updateSources.keySet()));
            oldDateSourceDrop(dropDataSourceNames, oldDataSources);
            this.contextManager.addResource(this.databaseName, updateSources);
        }
        log.info("重新加载shardingRule");
        this.rebuildShardingSphereRule(newDataNodes);
        log.info("重新构建managerBuildParameter");
        Map<String, DataSource> dataSourceMap = this.contextManager.getDataSourceMap(databaseName);
        this.contextManagerBuilderParameter = this.builderParameter(this.databaseName, this.modeConfiguration, dataSourceMap, this.ruleConfigs, this.properties);
    }

    /**
     * 删除旧的连接配置
     *
     * @param dropDataSourceNames
     * @param oldDataSources
     */
    private void oldDateSourceDrop(Collection<String> dropDataSourceNames, Map<String, DataSource> oldDataSources) {
        log.info("移除旧的db:{}", JsonMapper.defaultMapper().toJson(dropDataSourceNames));
        this.contextManager.dropResource(this.databaseName, dropDataSourceNames);
        dropDataSourceNames.forEach(s -> {
            DataSource dataSource = oldDataSources.get(s);
            if (dataSource instanceof DruidDataSource) {
                try {
                    log.info("关闭原dataSources:{}", s);
                    ((DruidDataSource) dataSource).close();
                } catch (Exception e) {
                    log.warn("db:{}close时出现异常,异常为:{}", s, e.getMessage(), e);
                }
            }
        });
    }

    private Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Map<String, DataSource> dataSourceMap) {
        Map<String, DataSourceProperties> result = new LinkedHashMap<>(dataSourceMap.size(), 1);
        for (Map.Entry<String, DataSource> each : dataSourceMap.entrySet()) {
            result.put(each.getKey(), DataSourcePropertiesCreator.create(each.getValue()));
        }
        return result;
    }

    @SuppressWarnings("unchecked")
    private void checkRuleConfiguration(final String databaseName, final Collection<RuleConfiguration> ruleConfigs) {
        ruleConfigs.forEach(each -> RuleConfigurationCheckerFactory.findInstance(each).ifPresent(optional -> optional.check(databaseName, each)));
    }

    private ContextManager createContextManager(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
                                                final Collection<RuleConfiguration> ruleConfigs, final Properties props) throws SQLException {
        ContextManagerBuilderParameter parameter = this.builderParameter(databaseName, modeConfig, dataSourceMap, ruleConfigs, props);
        this.contextManagerBuilderParameter = parameter;
        return ContextManagerBuilderFactory.getInstance(modeConfig).build(parameter);
    }

    private ContextManagerBuilderParameter builderParameter(final String databaseName, final ModeConfiguration modeConfig, final Map<String, DataSource> dataSourceMap,
                                                            final Collection<RuleConfiguration> ruleConfigs, final Properties props) {
        ContextManagerBuilderParameter parameter = ContextManagerBuilderParameter.builder()
                .modeConfig(modeConfig)
                .databaseConfigs(Collections.singletonMap(databaseName, new DataSourceProvidedDatabaseConfiguration(dataSourceMap, ruleConfigs)))
                .globalRuleConfigs(ruleConfigs.stream().filter(each -> each instanceof GlobalRuleConfiguration).collect(Collectors.toList()))
                .props(props)
                .instanceDefinition(new InstanceDefinition(InstanceType.JDBC)).build();
        return parameter;
    }

    private Optional<CachedDatabaseMetaData> createCachedDatabaseMetaData(final Map<String, DataSource> dataSources) throws SQLException {
        if (dataSources.isEmpty()) {
            return Optional.empty();
        }
        try (Connection connection = dataSources.values().iterator().next().getConnection()) {
            return Optional.of(new CachedDatabaseMetaData(connection.getMetaData()));
        }
    }

    @Override
    public Connection getConnection() throws SQLException {
        return DriverStateContext.getConnection(databaseName, contextManager, jdbcContext);
    }

    @Override
    public Connection getConnection(final String username, final String password) throws SQLException {
        return getConnection();
    }

    /**
     * Close data sources.
     *
     * @param dataSourceNames data source names to be closed
     * @throws Exception exception
     */
    public void close(final Collection<String> dataSourceNames) throws Exception {
        Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
        for (String each : dataSourceNames) {
            close(dataSourceMap.get(each));
        }
        contextManager.close();
    }

    private void close(final DataSource dataSource) throws Exception {
        if (dataSource instanceof AutoCloseable) {
            ((AutoCloseable) dataSource).close();
        }
    }

    @Override
    public void close() throws Exception {
        close(contextManager.getDataSourceMap(databaseName).keySet());
    }

    @Override
    public int getLoginTimeout() throws SQLException {
        Map<String, DataSource> dataSourceMap = contextManager.getDataSourceMap(databaseName);
        return dataSourceMap.isEmpty() ? 0 : dataSourceMap.values().iterator().next().getLoginTimeout();
    }

    @Override
    public void setLoginTimeout(final int seconds) throws SQLException {
        for (DataSource each : contextManager.getDataSourceMap(databaseName).values()) {
            each.setLoginTimeout(seconds);
        }
    }

    public Set<String> getDataBaseNames() {
        return this.contextManager.getDataSourceMap(databaseName).keySet();
    }


    private void rebuildShardingSphereRule(String newDataNodes) {
        if (StringUtils.isEmpty(newDataNodes)) {
            return;
        }
        log.info("重新加载db映射:{}", newDataNodes);
        MetaDataPersistService metaDataPersistService = this.contextManager.getMetaDataContexts().getPersistService().get();
        Map<String, DatabaseConfiguration> databaseConfigurationMap = getDatabaseConfigMap(Stream.of(this.databaseName).collect(Collectors.toList()), metaDataPersistService, this.contextManagerBuilderParameter);
        databaseConfigurationMap.get(this.databaseName).getRuleConfigurations().forEach(rule -> {
            if (rule instanceof ShardingRuleConfiguration) {
                Collection<ShardingTableRuleConfiguration> tables = ((ShardingRuleConfiguration) rule).getTables();
                List<ShardingTableRuleConfiguration> newShardingTableRuleConfigurations = tables.stream().map(table -> {
                    String newActualDataNodes = newDataNodes + "." + table.getLogicTable();
                    ShardingTableRuleConfiguration shardingTableRuleConfiguration = new ShardingTableRuleConfiguration(table.getLogicTable(), newActualDataNodes);
                    shardingTableRuleConfiguration.setDatabaseShardingStrategy(table.getDatabaseShardingStrategy());
                    shardingTableRuleConfiguration.setTableShardingStrategy(table.getTableShardingStrategy());
                    shardingTableRuleConfiguration.setReplaceTablePrefix(table.getReplaceTablePrefix());
                    shardingTableRuleConfiguration.setKeyGenerateStrategy(table.getKeyGenerateStrategy());
                    return shardingTableRuleConfiguration;
                }).collect(Collectors.toList());
                ((ShardingRuleConfiguration) rule).setTables(newShardingTableRuleConfigurations);
            }
        });
        ConfigurationProperties props = new ConfigurationProperties(metaDataPersistService.getPropsService().load());
        Collection<ShardingSphereRule> build = DatabaseRulesBuilder.build(databaseName, databaseConfigurationMap.get(this.databaseName), props);
        List<ShardingSphereRule> oldRules = (List<ShardingSphereRule>) this.contextManager.getMetaDataContexts().getMetaData().getDatabases().get(databaseName).getRuleMetaData().getRules();
        oldRules.clear();
        oldRules.addAll(build);
    }

    private Map<String, DatabaseConfiguration> getDatabaseConfigMap(final Collection<String> databaseNames, final MetaDataPersistService metaDataPersistService,
                                                                    final ContextManagerBuilderParameter parameter) {
        Map<String, DatabaseConfiguration> result = new HashMap<>(databaseNames.size(), 1);
        databaseNames.forEach(each -> result.put(each, createDatabaseConfiguration(each, metaDataPersistService, parameter)));
        return result;
    }

    private DatabaseConfiguration createDatabaseConfiguration(final String databaseName, final MetaDataPersistService metaDataPersistService,
                                                              final ContextManagerBuilderParameter parameter) {
        Map<String, DataSource> dataSources = this.contextManager.getDataSourceMap(databaseName);
        Collection<RuleConfiguration> databaseRuleConfigs = metaDataPersistService.getDatabaseRulePersistService().load(databaseName);
        return new DataSourceProvidedDatabaseConfiguration(dataSources, databaseRuleConfigs);
    }
}


通过调用重写的DataSource的updateContextMetaData方法来重新加载连接配置

     * @param addDataSourcePropertiesMap 添加的配置
     * @param dropDataSourceNames        移除的配置 
     * @param updateSources              修改的配置
     * @param newDataNodes               新的节点使用配置
spring:
  shardingsphere:
    datasource:
      names: ${SHARDING_DATA_SOURCE_NAMES:db-0,db-1}
      db-0:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://${mysql.message0.host}/${mysql.message0.database}?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false
        username: ${mysql.message0.username}
        password: ${mysql.message0.password}
        initialSize: 5 # 初始化大小
        minIdle: 5        # 最小
        maxActive: 20     # 最大
        maxWait: 60000    # 获取连接等待超时的时间
        ...
      db-1:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: com.mysql.jdbc.Driver
        ...

参数说明
参数名说明
addDataSourcePropertiesMap添加的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties
dropDataSourceNames移除的db连接,key->连接id,例如配置中的db-0,db-1
updateSources修改的db连接,key->连接id,例如配置中的db-0,db-1,value->根据普通db连接配置构建成的DataSourceProperties
newDataNodes新的节点使用配置,将配置项中使用的的spring.shardingsphere.datasource.names值更换为该值

DataSourceProperties构建类

getDataSource(),传入的值为普通的db连接配置,例如

        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: com.mysql.jdbc.Driver
        url: jdbc:mysql://db3Ip/db3DB?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8&autoReconnect=true&useSSL=false
        username: db3User
        password: db3Pwd
        initialSize: 5 # 初始化大小
        minIdle: 5        # 最小
        maxActive: 20     # 最大
        maxWait: 60000    # 获取连接等待超时的时间
        timeBetweenEvictionRunsMillis: 60000    # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒(3600000:为1小时)
        minEvictableIdleTimeMillis: 300000      # 配置一个连接在池中最小生存的时间,单位是毫秒
        validationQuery: select current_timestamp() #SELECT 1 FROM DUAL  #用来检测连接是否有效的sql,要求是一个查询语句。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会其作用。
        testWhileIdle: true   #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。建议配置为true,不影响性能,并且保证安全性。
        testOnBorrow: false   #申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:true
        testOnReturn: false #归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。缺省值:false
        poolPreparedStatements: true    #打开PSCache,并且指定每个连接上PSCache的大小
        #是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle。在mysql5.5以下的版本中没有PSCache功能,建议关闭掉。5.5及以上版本有PSCache,建议开启。缺省值:false
        maxPoolPreparedStatementPerConnectionSize: 20   # 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100。

package com.kittlen.provider.config.sharding;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.expr.InlineExpressionParser;
import org.apache.shardingsphere.spring.boot.datasource.AopProxyUtils;
import org.apache.shardingsphere.spring.boot.util.PropertyUtil;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.stereotype.Component;

import javax.naming.NamingException;
import javax.sql.DataSource;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
 * @author kittlen
 * @version 1.0
 */
public class MyShardingDataSource {
    private static final String PREFIX = "spring.shardingsphere.datasource.";

    private static final String DATA_SOURCE_NAME = "name";

    private static final String DATA_SOURCE_NAMES = "names";

    private static final String DATA_SOURCE_TYPE = "type";

    private static final String JNDI_NAME = "jndi-name";

    /**
     * Get data source map.
     *
     * @param environment spring boot environment
     * @return data source map
     */
    public static Map<String, DataSource> getDataSourceMap(final Environment environment) {
        Map<String, DataSource> result = new LinkedHashMap<>();
        for (String each : getDataSourceNames(environment)) {
            try {
                result.put(each, getDataSource(environment, each));
            } catch (final NamingException ex) {
                throw new ShardingSphereException("Can't find JNDI data source.", ex);
            }
        }
        return result;
    }

    /**
     * Get data source map.
     *
     * @param environment spring boot environment
     * @return data source map
     */
    public static Map<String, DataSourceProperties> getDataSourcePropertiesMap(final Environment environment) {
        Map<String, DataSourceProperties> result = new LinkedHashMap<>();
        for (String each : getDataSourceNames(environment)) {
            try {
                result.put(each, getDataSourceProperties(environment, each));
            } catch (final NamingException ex) {
                throw new ShardingSphereException("Can't find JNDI data source.", ex);
            }
        }
        return result;
    }

    private static List<String> getDataSourceNames(final Environment environment) {
        StandardEnvironment standardEnv = (StandardEnvironment) environment;
        standardEnv.setIgnoreUnresolvableNestedPlaceholders(true);
        String dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAME);
        if (Strings.isNullOrEmpty(dataSourceNames)) {
            dataSourceNames = standardEnv.getProperty(PREFIX + DATA_SOURCE_NAMES);
        }
        return new InlineExpressionParser(dataSourceNames).splitAndEvaluate();
    }

    private static DataSource getDataSource(final Environment environment, final String dataSourceName) throws NamingException {
        Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);
        Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);
        if (dataSourceProps.containsKey(JNDI_NAME)) {
            return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());
        }
        return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));
    }

    private static DataSourceProperties getDataSourceProperties(final Environment environment, final String dataSourceName) throws NamingException {
        Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, String.join("", PREFIX, dataSourceName), Map.class);
        Preconditions.checkState(!dataSourceProps.isEmpty(), "Wrong datasource [%s] properties.", dataSourceName);
        if (dataSourceProps.containsKey(JNDI_NAME)) {
            return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));
        }
        return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));
    }


    public static DataSourceProperties getDataSourceProperties(Map<String, Object> dataSourceProps) throws NamingException {
        if (dataSourceProps.containsKey(JNDI_NAME)) {
            return DataSourcePropertiesCreator.create(getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString()));
        }
        return new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps));
    }


    public static DataSource getDataSource(Map<String, Object> dataSourceProps) throws NamingException {
        if (dataSourceProps.containsKey(JNDI_NAME)) {
            return getJNDIDataSource(dataSourceProps.get(JNDI_NAME).toString());
        }
        return DataSourcePoolCreator.create(new DataSourceProperties(dataSourceProps.get(DATA_SOURCE_TYPE).toString(), PropertyUtil.getCamelCaseKeys(dataSourceProps)));
    }


    private static DataSource getJNDIDataSource(final String jndiName) throws NamingException {
        JndiObjectFactoryBean bean = new JndiObjectFactoryBean();
        bean.setResourceRef(true);
        bean.setJndiName(jndiName);
        bean.setProxyInterface(DataSource.class);
        bean.afterPropertiesSet();
        return (DataSource) AopProxyUtils.getTarget(bean.getObject());
    }
}

变动连接类 ShardingDataSourceProperties .setDataSource()’

通过调用setDataSource方法来更新db连接

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.StopException;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.ListUtils;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author kittlen
 * @version 1.0
 * @date 2023/04/03 15:15
 */
@Slf4j
@Getter
@Component
@ApiModel("动态shardingJDBC添加数据源")
@ConditionalOnProperty(value = {"spring.shardingsphere.datasource.dynamicDataBase.useRegistryConfig", "spring.shardingsphere.enabled"}, havingValue = "true")
@ConfigurationProperties(ShardingDataSourceProperties.PROPERTIES)
public class ShardingDataSourceProperties implements EnvironmentAware {


    public static final String PROPERTIES = "sharding.dynamicdatasource";

    /**
     * 获取其他配置的key
     * 如果MyDataSource的sourceName为该值,这代表这是其他配置项
     */
    private static final String CONFIG_OTHER_CONFIG_KEY = "otherConfig";
    /**
     * 默认配置
     */
    private static final String CONFIG_DEF_CONFIG_KEY = "defConfig";

    /**
     * 其他配置项中获取新db节点的key
     */
    private static final String OTHER_CONFIG_ACTUAL_DATA_NODES_KEY = "actualDataNodes";

    /**
     * 当前实例的tag标记(自定义配置)
     */
    @ApiModelProperty("当前实例的tag标记")
    @Value("${kittlen.service.thisTag:testservice-1}")
    private String thisTag;

    /**
     * 本地的配置
     */
    @ApiModelProperty("本地的配置")
    private Map<String, DataSourceProperties> localProperties;

    @Resource
    ShardingSphereDataSource shardingSphereDataSource;

    /**
     * 记录异常数据库连接配置类
     */
    @ApiModelProperty("记录异常数据库连接配置类")
    private List<DruidDataSource> errorDataSources = new ArrayList<>();

    /**
     * 连接配置的名称
     */
    @ApiModelProperty("连接配置的名称")
    private Set<String> dataSourceNames = new HashSet<>();

    /**
     * 连接配置
     * byYml
     * key-
     *
     * @ApiModelProperty("使用的对象") private String useObjectTag;
     */
    @ApiModelProperty("连接配置")
    private Map<String, List<MyDataSource>> dataSource = new LinkedHashMap<>();

    @ApiModelProperty("当前配置的版本号信息")
    private Map<String, MyDataSource> oldDataSources = new LinkedHashMap<>();

    /**
     * 最后获取到的nodes配置信息
     */
    @Value("${mysql.actual-data-nodes}")
    private String lastActualDataNodes;

    private void setLastActualDataNodes(String lastActualDataNodes) {
        this.lastActualDataNodes = lastActualDataNodes;
    }

    public void setDataSource(Map<String, List<MyDataSource>> dataSource) throws Exception {
        log.info("加载动态dataSources");
        //修改配置时,如果存在错误连接的配置,则断开连接
        if (!CollectionUtils.isEmpty(errorDataSources)) {
            for (DruidDataSource errorDataSource : errorDataSources) {
                errorDataSource.connectionThreadStop();
            }
        }
        Set<String> localNames = localProperties.keySet();
        //删除的db,获取之前拥有的全部,剔除现在拥有的,就是剔除的
        Set<String> dropNames = new HashSet<>(dataSourceNames);
        //添加的db
        Map<String, DataSourceProperties> newSources = new HashMap<>();
        //修改的db
        Map<String, DataSourceProperties> updateSources = new HashMap<>();

        String nowDataNodes = null;
        List<MyDataSource> thisDataSources = dataSource.get(thisTag);
        if (thisDataSources == null) {
            log.info("该实例:{}没有配置数据源,尝试获取所有实例配置", thisTag);
            thisDataSources = ListUtils.EMPTY_LIST;
        }
        List<MyDataSource> defConfig = dataSource.get(CONFIG_DEF_CONFIG_KEY);
        Map<String, MyDataSource> collect = thisDataSources.stream().collect(Collectors.toMap(MyDataSource::getSourceName, a -> a, (a, b) -> {
            log.warn("实例:{}配置的sourceName:{}重复", thisTag, a.getSourceName());
            return b;
        }));
        for (MyDataSource myDataSource : defConfig) {
            collect.putIfAbsent(myDataSource.getSourceName(), myDataSource);
        }
        for (MyDataSource myDataSource : collect.values()) {
            //获取其他配置项信息
            if (CONFIG_OTHER_CONFIG_KEY.equals(myDataSource.getSourceName())) {
                Object o = myDataSource.getSourceConfig().get(OTHER_CONFIG_ACTUAL_DATA_NODES_KEY);
                if (o != null) {
                    nowDataNodes = o.toString();
                }
                continue;
            }
            String sourceName = myDataSource.getSourceName();
            if (StringUtil.isEmpty(sourceName)) {
                throw new MyException("sourceName is empty");
            }
            if (newSources.containsKey(sourceName)) {
                throw new MyException("sourceName 重复");
            }
            //现在还存在该配置,则代表配置未被删除
            dropNames.remove(myDataSource.getSourceName());
            MyDataSource oldDataSource = oldDataSources.get(myDataSource.getSourceName());
            //如果版本号不存在,则代表这个配置是新的
            if (oldDataSource == null) {
                //如果本地已经存在该配置,则代表这个配置是修改本地配置的,执行修改操作
                if (localNames.contains(sourceName)) {
                    updateSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
                } else {
                    //否则添加该配置
                    newSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
                }
            } else {
                if (!oldDataSource.equals(myDataSource)) {
                    //如果版本不一致,则代表是修改的配置,版本一直,则不变动
                    updateSources.put(sourceName, MyShardingDataSource.getDataSourceProperties(myDataSource.getSourceConfig()));
                }
            }
        }
        if (!newSources.isEmpty() || !updateSources.isEmpty() || !dropNames.isEmpty()) {
            //使用的是putAll的方式,会覆盖原有的配置
            //先校验配置再覆盖配置,所以当配置连接不正确时,不会对原有配置产生影响
            //当配置的数据库连接不上时,会一直在尝试重连,默认就算再次修改为正确的配置也没法停止重连
            try {
                shardingSphereDataSource.updateContextMetaData(newSources, dropNames, updateSources, nowDataNodes);
                this.lastActualDataNodes = nowDataNodes;
            } catch (Exception e) {
                log.error("配置的数据库连接出现异常,异常为:{}", e.getMessage(), e);
                if (e instanceof IllegalArgumentException) {
                    Throwable cause = e.getCause();
                    if (cause instanceof StopException) {
                        DruidDataSource errDataSource = ((StopException) cause).getErrDataSource();
                        //记录错误配置连接的对象
                        errorDataSources.add(errDataSource);
                    }
                }
                if (e instanceof StopException) {
                    DruidDataSource errDataSource = ((StopException) e).getErrDataSource();
                    //记录错误配置连接的对象
                    errorDataSources.add(errDataSource);
                }
            }
        } else {
            if (!Objects.equals(nowDataNodes, lastActualDataNodes)) {
                shardingSphereDataSource.updateContextMetaData(null, null, null, nowDataNodes);
            }
            this.lastActualDataNodes = nowDataNodes;
        }
        this.dataSource = dataSource;
        this.dataSourceNames.clear();
        this.oldDataSources.clear();
        for (MyDataSource myDataSource : collect.values()) {
            if (!CONFIG_OTHER_CONFIG_KEY.equals(myDataSource.getSourceName())) {
                dataSourceNames.add(myDataSource.getSourceName());
                this.oldDataSources.put(myDataSource.getSourceName(), myDataSource);
            }
        }
        log.info("加载动态dataSources完毕");
    }

    @Override
    public void setEnvironment(Environment environment) {
        localProperties = MyShardingDataSource.getDataSourcePropertiesMap(environment);
    }

    @Getter
    @Setter
    @ApiModel("连接配置")
    @EqualsAndHashCode
    public static class MyDataSource {

        @ApiModelProperty("配置名")
        private String sourceName;


        @ApiModelProperty("配置信息,key:配置名,value:配置值")
        private Map<String, Object> sourceConfig;

    }
}

动态db的yam文件内容


sharding:
  dynamicDataSource:
    dataSource:
      defConfig: #默认配置,如果有实例对应的otherConfig配置,则以实例的为主,没有的使用该配置
        - sourceName: otherConfig
          sourceConfig:
            actualDataNodes: db-$->{0..2}
        - sourceName: db-2 #必填项
          sourceConfig:
            type: com.alibaba.druid.pool.DruidDataSource
            driverClassName: com.mysql.jdbc.Driver
            url: jdbc:mysql://******
            username: ***
            password: ***
            initialSize: 5
            minIdle: 5
            maxActive: 20
            maxWait: 60000
            timeBetweenEvictionRunsMillis: 60000
            minEvictableIdleTimeMillis: 300000
            validationQuery: select current_timestamp()
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            poolPreparedStatements: true
            maxPoolPreparedStatementPerConnectionSize: 20
      testservice-1: #实例id,用于多实例部署使用
        - sourceName: otherConfig
          sourceConfig:
            actualDataNodes: db-$->{0..3}
        - sourceName: db-2 #必填项
          sourceConfig:
            type: com.alibaba.druid.pool.DruidDataSource
            driverClassName: com.mysql.jdbc.Driver
            url: jdbc:mysql://******
            username: ***
            password: ***
            initialSize: 5
            minIdle: 5
            maxActive: 20
            maxWait: 60000
            timeBetweenEvictionRunsMillis: 60000
            minEvictableIdleTimeMillis: 300000
            validationQuery: select current_timestamp()
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            poolPreparedStatements: true
            maxPoolPreparedStatementPerConnectionSize: 20
        - sourceName: db-3 #必填项
          sourceConfig:
            type: com.alibaba.druid.pool.DruidDataSource
            driverClassName: com.mysql.jdbc.Driver
            url: jdbc:mysql://******
            username: ***
            password: ***
            initialSize: 5
            minIdle: 5
            maxActive: 20
            maxWait: 60000
            timeBetweenEvictionRunsMillis: 60000
            minEvictableIdleTimeMillis: 300000
            validationQuery: select current_timestamp()
            testWhileIdle: true
            testOnBorrow: false
            testOnReturn: false
            poolPreparedStatements: true
            maxPoolPreparedStatementPerConnectionSize: 20
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐