自定义loadbalance实现feignclient的自定义路由
项目背景
服务A有多个同事同时开发,每个同事都在dev或者test环境发布自己的代码,注册到注册中心有好几个(本文nacos为例),这时候调用feign可能会导致请求到不同分支的服务上面,会出现一些问题,本文重点在于解决该问题
实操
解决方案
/*** @author authorZhao* @since 2023-08-03*/
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalanceConfig{@Bean@ConditionalOnMissingBeanpublic ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);return new MyRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);}
}
//@Configuration(proxyBeanMethods = false)
//@LoadBalancerClients(defaultConfiguration = LoadBalanceConfig.class) 全局配置
//@LoadBalancerClient(value = "user-center-service",configuration = LoadBalanceConfig.class) //单个配置
public class LoadBalance {}
/*** A Round-Robin-based implementation of {@link ReactorServiceInstanceLoadBalancer}.** @author Spencer Gibb* @author Olga Maciaszek-Sharma* @author Zhuozhi JI*/
public class MyRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {private static final Log log = LogFactory.getLog(CubeRoundRobinLoadBalancer.class);final AtomicInteger position;final String serviceId;ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;/*** @param serviceInstanceListSupplierProvider a provider of* {@link ServiceInstanceListSupplier} that will be used to get available instances* @param serviceId id of the service for which to choose an instance*/public CubeRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,String serviceId) {this(serviceInstanceListSupplierProvider, serviceId, new Random().nextInt(1000));}/*** @param serviceInstanceListSupplierProvider a provider of* {@link ServiceInstanceListSupplier} that will be used to get available instances* @param serviceId id of the service for which to choose an instance* @param seedPosition Round Robin element position marker*/public CubeRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,String serviceId, int seedPosition) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;this.position = new AtomicInteger(seedPosition);}@SuppressWarnings("rawtypes")@Override// see original// https://github.com/Netflix/ocelli/blob/master/ocelli-core/// src/main/java/netflix/ocelli/loadbalancer/RoundRobinLoadBalancer.javapublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {for (ServiceInstance instance : instances) {if(instance instanceof NacosServiceInstance nacosServiceInstance){//项目注册的时候在配置一个元数据,后面可以根据当前上下文拿到当前tag,进行匹配,默认使用base分支String s = instance.getMetadata().get("feature-tag");if("base".equals(s)){return new DefaultResponse(nacosServiceInstance);}}}if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}// Ignore the sign bit, this allows pos to loop sequentially from 0 to// Integer.MAX_VALUEint pos = this.position.incrementAndGet() & Integer.MAX_VALUE;ServiceInstance instance = instances.get(pos % instances.size());return new DefaultResponse(instance);}}
loadbalance执行过程
调用流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lf3NGlrZ-1692865252643)(img/image-20230824160138272.png)]
feign.SynchronousMethodHandler#executeAndDecode
->feign.Client#execute
->FeignBlockingLoadBalancerClient#execute
public Response execute(Request request, Request.Options options) throws IOException {final URI originalUri = URI.create(request.url());String serviceId = originalUri.getHost();Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);String hint = getHint(serviceId);DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(//loadBalancerClientFactory是NamedContextFactory的子类 容器map,刷新user-center-service子容器loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),RequestDataContext.class, ResponseData.class, ServiceInstance.class);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));//根据serviceId拿到ReactiveLoadBalancer,执行choose方法,然后到了我们的方法ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(instance);if (instance == null) {String message = "Load balancer does not contain an instance for the service " + serviceId;if (LOG.isWarnEnabled()) {LOG.warn(message);}supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.DISCARD, lbRequest, lbResponse)));return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value()).body(message, StandardCharsets.UTF_8).build();}String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();Request newRequest = buildRequest(request, reconstructedUrl);LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());}
后面MyRoundRobinLoadBalancer其实是抄袭RoundRobinLoadBalancer的实现,默认就是轮训
完
本文为原创,转载请申明