Spring

由于运维特殊性,我们没有使用配置中心,仅仅只是使用了Nacos作为注册中心。业务场景对我们提出了需求,动态更新网关路由信息而不重启应用。考虑之下,我们选择了从数据库读取网关路由配置,更新配置到gateway应用。

我们先后经历2个版本,
一是直接实现RouteDefinitionRepository接口;
二是更新路由配置信息到GatewayProperties bean,通过RefreshRoutesEvent刷新路由配置信息

实现RouteDefinitionRepository接口

源码分析

org.springframework.cloud.gateway.config.GatewayAutoConfiguration

	@Bean
	@ConditionalOnMissingBean(RouteDefinitionRepository.class)
	public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() {
		return new InMemoryRouteDefinitionRepository();
	}

在gateway初始化配置类中,发现默认是使用内存策略实现网关路由源,方法上面还有个@ConditionalOnMissingBean的注解,当实现RouteDefinitionRepository接口,会优先使用它的实现类。所以根据这点,第一版的实现,就是通过实现RouteDefinitionRepository接口。根据默认的处理策略,系统会每隔30秒,调用一次RouteDefinitionRepository#getRouteDefinitions获取路由配置信息,更新到网关处理逻辑里。

实现

数据库设计,把配置文件的值,复制到数据库表对应的字段即可。

CREATE TABLE t_route_config (
  route_id varchar(50) NOT NULL,
  route_name varchar(200) DEFAULT NULL COMMENT '名称',
  uri varchar(200) DEFAULT NULL COMMENT '网关url',
  predicates text COMMENT '网关断言',
  filters text COMMENT '网关过滤器',
  metadata text COMMENT '元数据信息',
  route_order text COMMENT '规则顺序',
  sys_create_time timestamp NULL DEFAULT NULL COMMENT '创建时间',
  sys_update_time timestamp NULL DEFAULT NULL COMMENT '修改时间',
  sys_status int(11) DEFAULT NULL COMMENT '数据标识',
  sys_remark varchar(500) DEFAULT NULL COMMENT '备注',
  PRIMARY KEY (route_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

实现类

/**
 * @author huangliuyu
 * @date 2022-08-23
 * @description
 */
@Data
public class RouteConfig implements Serializable {
    private String routeId;
    private String routeName;
    private String uri;
    private String predicates;
    private String filters;
    private String metadata;
    private Integer routeOrder;
    //更新时间
    private LocalDateTime sysUpdateTime;
}

RouteDefinitionRepository实现类,实现getRouteDefinitions方法,获取网关路由配置,通过@Repository注册到Spring Bean就可以加载处理了。

@Repository
public class OldDatabaseRouteDefinitionRepository implements RouteDefinitionRepository {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
        List<RouteDefinition> routeDefinitions = this.getRouteConfigs();
        return Flux.fromIterable(routeDefinitions);
    }

    public List<RouteDefinition> getRouteConfigs() {
        String sql = "select * from t_route_config t where t.sys_status=0";
        List<RouteConfig> rules = jdbcTemplate.query(sql, new BeanPropertyRowMapper<>(RouteConfig.class));
        if (null == rules || rules.size() <= 0) {
            return Collections.EMPTY_LIST;
        }

        List<RouteDefinition> routeDefinitions = new ArrayList<>();
        for (RouteConfig rule : rules) {
            RouteDefinition routeDefinition = new RouteDefinition();
            routeDefinition.setId(rule.getRouteId());
            routeDefinition.setUri(URI.create(rule.getUri()));
            routeDefinition.setPredicates(this.getPredicates(rule.getPredicates()));
            routeDefinition.setFilters(this.getFilters(rule.getFilters()));
            routeDefinition.setMetadata(this.getMetadata(rule.getMetadata()));
            Integer ruleOrder = rule.getRouteOrder();
            if (null != ruleOrder) {
                routeDefinition.setOrder(ruleOrder);
            }
            routeDefinitions.add(routeDefinition);
        }
        return routeDefinitions;
    }


    private List<PredicateDefinition> getPredicates(String text) {
        if (StringUtils.isBlank(text)) {
            return Collections.EMPTY_LIST;
        }
        Yaml yaml = new Yaml();
        List<String> predicateList = yaml.loadAs(text, List.class);
        List<PredicateDefinition> predicateDefinitions = new ArrayList<>();
        for (String predicate : predicateList) {
            if (StringUtils.isBlank(predicate)) {
                continue;
            }
            PredicateDefinition definition = new PredicateDefinition(predicate);
            predicateDefinitions.add(definition);
        }
        return predicateDefinitions;
    }

    private List<FilterDefinition> getFilters(String text) {
        if (StringUtils.isBlank(text)) {
            return Collections.EMPTY_LIST;
        }
        Yaml yaml = new Yaml();
        List<String> filterList = yaml.loadAs(text, List.class);
        List<FilterDefinition> filterDefinitions = new ArrayList<>();
        for (String filter : filterList) {
            if (StringUtils.isBlank(filter)) {
                continue;
            }
            FilterDefinition definition = new FilterDefinition(filter);
            filterDefinitions.add(definition);
        }
        return filterDefinitions;
    }

    private Map<String, Object> getMetadata(String text) {
        if (StringUtils.isBlank(text)) {
            return Collections.EMPTY_MAP;
        }
        Yaml yaml = new Yaml();
        return yaml.loadAs(text, Map.class);
    }

通过RefreshRoutesEvent更新

在后期开发从数据库中读取cors跨域配置功能时,发现可以通过RefreshRoutesEvent刷新网关路由配置信息

源码分析

CachingRouteLocator实现ApplicationListener,监听RefreshRoutesEvent事件,当有RefreshRoutesEvent出现,处理以下逻辑更新网关路由。
org.springframework.cloud.gateway.route.CachingRouteLocator#onApplicationEvent

	private final RouteLocator delegate;
	
	private Flux<Route> fetch() {
		return this.delegate.getRoutes().sort(AnnotationAwareOrderComparator.INSTANCE);
	}

	@Override
	public void onApplicationEvent(RefreshRoutesEvent event) {
		try {
			fetch().collect(Collectors.toList()).subscribe(
					list -> Flux.fromIterable(list).materialize().collect(Collectors.toList()).subscribe(signals -> {
						applicationEventPublisher.publishEvent(new RefreshRoutesResultEvent(this));
						cache.put(CACHE_KEY, signals);
					}, this::handleRefreshError), this::handleRefreshError);
		}
		catch (Throwable e) {
			handleRefreshError(e);
		}
	}

由CachingRouteLocator#fetch方法,一直往下看

CompositeRouteLocator是RouteLocator的实现类
org.springframework.cloud.gateway.route.CompositeRouteLocator#getRoutes

@Override
	public Flux<Route> getRoutes() {
		return this.delegates.flatMapSequential(RouteLocator::getRoutes);
	}

org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getRoutes

public Flux<Route> getRoutes() {
		Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);
		//todo ...
		return routes.map(route -> {
			if (logger.isDebugEnabled()) {
				logger.debug("RouteDefinition matched: " + route.getId());
			}
			return route;
		});
	}

RouteDefinitionRouteLocator就是各类RouteDefinitionRepository实现的接口,获取网关路由配置,当然默认的InMemoryRouteDefinitionRepository也不例外。

所以这里,想到改造原来获取网关路由的策略。

实现

public class RouteCorsConfig {
    @Autowired
    private GatewayProperties gatewayProperties;
    @Autowired
    private ApplicationEventPublisher publisher;
    @Autowired
    private RouteConfigRepository routeConfigRepository;
    @Autowired
    private GatewayProfileRepository profileRepository;
    private Integer routeVersion = 0;

    /**
     * 刷新路由配置
     */
    private void refreshRouteConfig() {
    	//这里原来的处理逻辑,获取List<RouteDefinition>,不过不用实现RouteDefinitionRepository接口了。
        List<RouteDefinition> routeDefinitions = routeConfigRepository.getRouteConfigs();
        //更新gatewayProperties中的路由配置信息
        gatewayProperties.setRoutes(routeDefinitions);
        //推送RefreshRoutesEvent事件
        publisher.publishEvent(new RefreshRoutesEvent(this));
        log.info("完成刷新网关路由配置 总数 {}", routeDefinitions.size());
    }

	//这里是更新路由的策略,大家根据自己的情况来就好,定时刷新
	@Scheduled(cron = "0/30 * * * * ?")
    @PostConstruct
    public void refreshConfig() {
        //用了一张表存储配置版本号
        //读取环境属性
        Map<String, String> profileMap = profileRepository.getProfile();
        //路由
        Integer routeVer = MapUtils.getInteger(profileMap, "ROUTE_VERSION", 0);
        //当数据库中版本号,大于内存中版本号时更新路由配置
        if (routeVer > routeVersion) {
            log.info("开始刷新网关路由配置 version={}", routeVer);
            this.refreshRouteConfig();
            routeVersion = routeVer;
        }
        //cors跨域
        //todo ....
    }
}
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐