1. 功能目的
通过设置请求头的方式将http请求优先打到指定的服务上,为微服务开发调试工作提供便利
- 请求报文难模拟:可以直接在测试环境页面上操作,流量直接打到本地IDEA进行debug
- 请求链路较长:本地开发无需启动所有服务,仅需要启动目标服务
- 协同开发:与其他人一同开发,并且依赖对方开发的接口,可以直接将自己本地服务的请求发到对方本地服务上
这是目前主要的使用目的,当然也可以调整负载均衡逻辑,配合网关的一些自定义配置,扩充为灰度发布的效果
2. 实现原理
通过在网关以及Ribbon,实现自定义的负载均衡策略,将请求引流到本地。
PS:需要服务器能访问本地,需要类似OpenVPN这样赋予本地一个IP,供服务器网关能请求到本地
开启OpenVPN之后本机会有多个IP,通过配置指定注册到nacos上的IP
spring.cloud.nacos.discovery.network-interface: 10.0
2.1 本地服务调整
本地启动时,配置文件添加参数,设置一个元数据作为服务的流量标识
比如mdm服务配置参数
spring.cloud.nacos.discovery.metadata.request-mark: azhuzhu
服务启动之后,我们可以在nacos的服务列表里看到元数据
2.2 网关负载均衡
服务分类:
- 服务器服务:metadata中没有 requestMark 参数
- 本地服务:metadata中带有 requestMark 参数
网关实现自定义负载均衡策略:
- 判断请求头中是否带有本地流量标识:requestMark
- 有标识:判断有无metadata匹配的服务实例
- 有:调用匹配的服务实例
- 无:判断目标请求有没有服务器服务实例
- 有:服务器服务随机数负载
- 无:可用实例随机负载
- 无标识:判断目标请求有没有服务器服务实例
- 有:服务器服务随机数负载
- 无:可用实例随机负载
网关负载处理了第一目标服务,假如调用链路为 mdm -> commom-api,我们启动的是common-api,则需要在服务端的ribbon中做负载
- 有标识:判断有无metadata匹配的服务实例
2.3 请求发起
比如,约定request-mark作为本地流量标识
请求头 request-mark=azhuzhu 表示流量优先打到带有元数据 request-mark=azhuzhu 的服务
- 浏览器操作:通过浏览器插件ModHeader,在浏览器发起请求时,带上请求头
- HTTP工具:带上自定义请求头
3. 具体代码介绍
以下所有注册的bean, 都通过指定的配置参数开启
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
3.1 自定义负载均衡器
网关及其他客户端的流量染色具体的负载逻辑实现
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import reactor.core.publisher.Mono;import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;/*** 自定义负载均衡器 用于开发环境的流量染色** @author 阿猪 2024-08-09 11:45*/
@Slf4j
@Configuration
@SuppressWarnings("deprecation")
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
@LoadBalancerClients(defaultConfiguration = {ReqMarkLoadBalancer.class})
public class ReqMarkLoadBalancer {public static final String REQUEST_MARK = "request-mark";/*** 开启流量染色时 替换默认的负载器** @param environment 环境信息* @param loadBalancerClientFactory 负载器工厂* @return 自定义负载器*/@Bean@Primarypublic ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);}static class RandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;private final String serviceId;public RandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,String serviceId) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier =serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);String requestMark = getRequestMark(request);return supplier.get().next().map(serviceInstances -> processInstanceResponseByReqMark(serviceInstances, requestMark));}private String getRequestMark(Request<?> request) {// 客户端的负载 直接从 RequestContextHolder 拿请求头ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes != null) {return attributes.getRequest().getHeader(REQUEST_MARK);}// 网关的负载从 request 取值(网关覆盖了默认实现 把context塞进去了 不然拿到是 null 跟spring boot版本有关系)if (!(request.getContext() instanceof ServerHttpRequest)) {return null;}ServerHttpRequest context = (ServerHttpRequest) request.getContext();if (context.getHeaders().containsKey(REQUEST_MARK)) {return context.getHeaders().getFirst(REQUEST_MARK);}return null;}/*** 默认的随机数负载** @param instances 可用服务实例* @return 命中实例*/private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (CollectionUtils.isEmpty(instances)) {log.warn("No instance available {}", serviceId);return new EmptyResponse();}Random random = new Random();ServiceInstance instance = instances.get(random.nextInt(instances.size()));return new DefaultResponse(instance);}private Response<ServiceInstance> processInstanceResponseByReqMark(List<ServiceInstance> instances, String requestMark) {if (instances.isEmpty()) {return new EmptyResponse();}ServiceInstance sameClusterNameInst = selectInstanceByReqMark(instances, requestMark);return new DefaultResponse(sameClusterNameInst);}private ServiceInstance selectInstanceByReqMark(List<ServiceInstance> instances, String requestMark) {// 元数据不带请求标识的服务, 标识为服务器上的服务List<ServiceInstance> serverInstances = instances.stream().filter(instance -> {Map<String, String> metadata = instance.getMetadata();return MapUtils.isEmpty(metadata) || !metadata.containsKey(REQUEST_MARK);}).collect(Collectors.toList());if (StringUtils.isBlank(requestMark)) {if (CollectionUtils.isEmpty(serverInstances)) {return instances.get(new Random().nextInt(instances.size()));}return serverInstances.get(new Random().nextInt(serverInstances.size()));}List<ServiceInstance> matchInstances = Lists.newArrayList();for (ServiceInstance instance : instances) {Map<String, String> metadata = instance.getMetadata();if (MapUtils.isEmpty(metadata)) {continue;}if (metadata.containsKey(REQUEST_MARK) && requestMark.equals(metadata.get(REQUEST_MARK))) {matchInstances.add(instance);}}Random random = new Random();// 优先匹配到的服务 最后是随机if (CollectionUtils.isNotEmpty(matchInstances)) {return matchInstances.get(random.nextInt(matchInstances.size()));}// 然后是无标识服务(服务器上的服务)if (CollectionUtils.isNotEmpty(serverInstances)) {return serverInstances.get(random.nextInt(serverInstances.size()));}// 前两者都没有就随机获取return instances.get(random.nextInt(instances.size()));}}}
3.2 流量标识请求头透传
这里使用Feign进行内部服务调用,需要将原请求的流量标识 请求头继续传递下去,保证后续的服务链路也能有流量染色的效果。
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;/*** 流量染色 - 流量标识请求头透传** @author 阿猪 2024-09-25 17:11*/
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class ReqMarkRequestInterceptor implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {// 从 request 中获取流量标识, 设置到 feign 的 requestTemplate中ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes != null) {requestTemplate.header(ReqMarkLoadBalancer.REQUEST_MARK, attributes.getRequest().getHeader(ReqMarkLoadBalancer.REQUEST_MARK));}}}
3.3 网关负载均衡-请求信息获取
由于这个方案中,负载均衡是依靠 请求头 判断的,详见上面请求头的获取ReqMarkLoadBalancer.getRequestMark
在spring boot 2.3.2 版本中 request.getContext是个空的,没法获取请求信息
2.6.x 后面没有这个问题,但需要关注下这个context的类型,调整代码
以下是覆盖默认实现,为 request 的 context 设置请求信息。
实际上是复制 ReactiveLoadBalancerClientFilter的源码稍作修改,看倒数最后两行
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;import java.net.URI;import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;/*** 覆盖默认的负载均衡器 改了choose方法 将request信息传入** @see ReactiveLoadBalancerClientFilter* @author 阿猪 2024-08-09 17:06*/
@Slf4j
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class CustomLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter {private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;private final LoadBalancerClientFactory clientFactory;private final LoadBalancerProperties properties;public CustomLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,LoadBalancerProperties properties) {super(null, null);this.clientFactory = clientFactory;this.properties = properties;}@Overridepublic int getOrder() {return LOAD_BALANCER_CLIENT_FILTER_ORDER;}@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// preserve the original urladdOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()+ " url before: " + url);}return choose(exchange).doOnNext(response -> {if (!response.hasServer()) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}ServiceInstance retrievedInstance = response.getServer();URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";if (schemePrefix != null) {overrideScheme = url.getScheme();}DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);URI requestUrl = reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);}).then(chain.filter(exchange));}@SuppressWarnings("deprecation")private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);if (loadBalancer == null) {throw new NotFoundException("No loadbalancer available for " + uri.getHost());}// 就改了这里 仅调整参数传入 保持原有逻辑(原代码传入了空的request)Request<?> request = new DefaultRequest<>(exchange.getRequest());return loadBalancer.choose(request);}}
将bean覆盖掉,替换掉原本的 ReactiveLoadBalancerClientFilter
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 流量染色 - 将ReactiveLoadBalancerClientFilter覆盖掉 为了获取到http请求头** @author 阿猪 2024-08-09 17:12*/
@Configuration
public class CustomLoadBalancerConfig {@Bean@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,LoadBalancerProperties properties){return new CustomLoadBalancerClientFilter(clientFactory, properties);}}