Java在SpringCloud中自定义Gateway负载均衡策略

Java在SpringCloud中自定义Gateway负载均衡策略

一、前言

spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。

背景:大文件上传做切片文件上传;

流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;

我的版本如下:

<spring-boot.version>2.7.3</spring-boot.version>
        <spring-cloud.version>2021.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>

二、参考默认实现

springCloud原生默认的负载均衡策略是这个类:

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer

我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:

在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。

三、实现代码

原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可

package com.wondertek.gateway.loadBalancer;import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {@Overridepublic int getOrder() {// 应该小于LoadBalancerClientFilter的顺序值return Ordered.HIGHEST_PRECEDENCE;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());// 存入Reactor上下文String resultCode = clientDeviceUniqueCode;return chain.filter(exchange).contextWrite(context -> {if (ObjectUtil.isNotEmpty(resultCode)) {log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);return context.put("identification", resultCode);} else {// 或者根据需求进行其他处理return context;}});}
}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.*;
import java.util.concurrent.ThreadLocalRandom;@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {//在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"return Mono.deferContextual(contextView -> {String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");log.info("上下游获取到的identification的值为:{}", identification);ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));});}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {Response<ServiceInstance> serviceInstanceResponse;if (Objects.isNull(identification)) {serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);} else {serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);}if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + this.serviceId);}return new EmptyResponse();} else {int index = ThreadLocalRandom.current().nextInt(instances.size());ServiceInstance instance = (ServiceInstance) instances.get(index);return new DefaultResponse(instance);}}private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {if (instances.isEmpty()) {log.warn("No servers available for service: " + this.serviceId);return new EmptyResponse();} else if (instances.size() == 1) {log.info("只有一个服务实例,直接返回这个实例");return new DefaultResponse(instances.get(0));} else {//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));//log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");// 使用排序后的列表来找到实例int ipHashCode = Math.abs(identification.hashCode());//log.info("identificationHashCode的值为:{}", ipHashCode);int instanceIndex = ipHashCode % sortedInstances.size();//log.info("instanceIndex的值为:{}", instanceIndex);ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);//log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());return new DefaultResponse(instanceToReturn);}}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final String serviceId;private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;final AtomicInteger position;public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;this.position = position;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));}private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,List<ServiceInstance> serviceInstances) {Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());}return serviceInstanceResponse;}private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {if (log.isWarnEnabled()) {log.warn("No servers available for service: " + serviceId);}return new EmptyResponse();}//创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题List<ServiceInstance> sortedInstances = new ArrayList<>(instances);// 现在对新列表进行排序,保持原始列表的顺序不变Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));//log.info("获取到的实例个数的值为:{}", sortedInstances.size());sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));int pos = Math.abs(this.position.incrementAndGet());//log.info("默认轮循机制,pos递加后的值为:{}", pos);int positionIndex = pos % instances.size();//log.info("取余后的positionIndex的值为:{}", positionIndex);ServiceInstance instance = instances.get(positionIndex);//log.info("instance.getUri()的值为:{}", instance.getUri());log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());return new DefaultResponse(instance);}}
package com.wondertek.gateway.loadBalancer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;import java.util.concurrent.atomic.AtomicInteger;@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),@LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {// 定义一个Bean来提供AtomicInteger的实例@Beanpublic AtomicInteger positionTracker() {// 这将在应用上下文中只初始化一次return new AtomicInteger(0);}//自定义优先级负载均衡器@Beanpublic ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,Environment environment,AtomicInteger positionTracker) {String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);//目的为解决文件上传切片文件分散上传的问题if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){//log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);}//log.info("服务名称:serviceId:{},走默认负载模式", serviceId);return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);}
}

【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/226003.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+elementPlus:el-drawer新增修改弹窗复用

在el-drawer的属性里设置:title属性&#xff0c;和重置函数 //html<!-- 弹窗 --><el-drawerv-model"drawer":title"title":size"505":direction"direction":before-close"handleClose"><el-formlabel-posit…

linux如何清理磁盘,使得数据难以恢复

sda 是硬盘&#xff0c;sda1 和 sda2 是硬盘的两个分区。centos-root 是一个逻辑卷&#xff0c;挂载在根目录 /。 /dev/sda 是硬盘&#xff0c;/dev/sda1 和 /dev/sda2 是硬盘的两个分区。 [rootnode2 ~]# dd if/dev/urandom of/dev/sda bs4M这个命令将从 /dev/urandom 读取随…

WPF实战项目二十二(客户端):首页添加备忘录与待办事项

1、在View文件夹下新建文件夹Dialog&#xff0c;新建View&#xff1a;AddMemoView、AddToDoView <UserControlx:Class"WPFProject.Views.Dialogs.AddToDoView"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://s…

Pandas教程(三)—— 数据清洗与准备

1.处理缺失值 1.1 数据删除函数 作用&#xff1a;删除Dataframe某行或某列的数据 语法&#xff1a;df.drop&#xff08; labels [ ] &#xff09; drop函数的几个参数&#xff1a; labels &#xff1a;接收一个列表&#xff0c;内含删除行 / 列的索引编号或索引名 axis &…

20231222给NanoPC-T4(RK3399)开发板的适配原厂Android10的挖掘机方案并跑通AP6398SV

20231222给NanoPC-T4(RK3399)开发板的适配原厂Android10的挖掘机方案并跑通AP6398SV 1、简略步骤&#xff1a;rootrootrootroot-X99-Turbo:~/3TB/3399-android10$ cat Rockchip_Android10.0_SDK_Release.tar.gz0* > Rockchip_Android10.0_SDK_Release.tar.gz rootrootrootro…

python pip安装依赖的常用软件源

目录 引言 一、什么是镜像源&#xff1f;​​​​​​​ 二、清华源 三、阿里源 四、中科大源 五、豆瓣源 六、更多资源 引言 在软件开发和使用过程中&#xff0c;我们经常需要下载和更新各种软件包和库文件。然而&#xff0c;由于网络环境的限制或者服务器的负载&#…

Vue3中说说Tree shaking特性?举例说明一下?

提起Vue3里面的Tree shaking时候&#xff0c;需要提到它是通过构建工具和模块导入方式实现的。然后我们再说说Tree shaking是什么和作用 一、通过构建工具和模块导入方式实现 1.配置构建工具&#xff1a; 在 Vue 3 项目中&#xff0c;通常使用 webpack 进行构建。为了启用 T…

百度CTO王海峰:文心一言用户规模破1亿

▶ 写在前面▶ 飞桨开发者已达1070万▶ 文心一言用户规模破亿&#xff0c;日提问量快速增长 ▶ 写在前面 “文心一言用户规模突破 1 亿。”12 月 28日&#xff0c;百度首席技术官、深度学习技术及应用国家工程研究中心主任王海峰在第十届 WAVE SUMMIT 深度学习开发者大会上宣布…

数据结构--查找

目录 1. 查找的基本概念 2. 线性表的查找 3. 树表的查找 3.1 二叉排序树 3.1.1 定义: 3.1.2 存储结构&#xff1a; 3.1.3 二叉排序树的查找 3.1.4 二叉排序树的插入 3.1.5 二叉排序树删除 3.2 平衡二叉树&#xff08;AVL 3.2.1 为什么要有平衡二叉树 3.2.2 定义 3.3 B-树 3.3.1…

【《设计模式之美》】如何取舍继承与组合

文章目录 什么情况下不推荐使用继承&#xff1f;组合相比继承有哪些优势&#xff1f;使用组合、继承的时机 本文主要想了解&#xff1a; 为什么组合优于继承&#xff0c;多用组合少用继承。如何使用组合来替代继承哪些情况适用继承、组合。有哪些设计模式使用到了继承、组合。 …

【Spring Security】快速入门之案例实操

目录 一、简介 1、什么是安全框架 2、主流的安全框架 3、为什么使用Spring Security 二、引言 1、什么是Spring Security 2、Spring Security工作原理 3、特点 三、快速入门 1、引入依赖 2、配置 3、启动测试 4、配置自定义账号密码 四、Web安全配置类 1.HttpSe…

c++输入输出流和文件操作总结

目录 一、c的输入输出流——> 指的是字节流的数据传送;具有类型安全和可扩展性。 二、流的出入路径 三、c流类库 ①概览 ②标准输出流&#xff1a; ③标准输入流&#xff1a; 四、文件操作&#xff08;ascii文件和二进制文件&#xff09; 五、字符串流&#xff08;或称…

企业数据可视化-亿发数据化管理平台提供商,实现一站式数字化运营

近些年来&#xff0c;国内企业数据化管理升级进程持续加速&#xff0c;以物联网建设、人工智能、大数据和5G网络等新技术的发展&#xff0c;推动了数字经济的蓬勃发展&#xff0c;成为维持经济持续稳定增长的重要引擎。如今许多国内中小型企业纷纷摒弃传统管理模式&#xff0c;…

Json和Xml

一、前言 学习心得&#xff1a;C# 入门经典第8版书中的第21章《Json和Xml》 二、Xml的介绍 Xml的含义&#xff1a; 可标记性语言&#xff0c;它将数据以一种特别简单文本格式储存。让所有人和几乎所有的计算机都能理解。 XML文件示例&#xff1a; <?xml version"1.…

AGV智能搬运机器人-替代人工工位让物流行业降本增效

在当今快速发展的世界中&#xff0c;物流业面临着巨大的挑战&#xff0c;包括提高效率、降低成本和优化工作流程。为了应对这些挑战&#xff0c;一种新型的自动化设备——智能搬运机器人正在崭露头角。本文将通过一个具体的案例来展示富唯智能转运机器人在实际应用中的价值。 案…

Flask登陆后登陆状态及密码的修改和处理

web/templates/common 是统一布局 登录成功 后flask框架服务器默认由login.html进入仪表盘页面index.html(/),该页面的设置在 (web/controllers/user/index.py)&#xff0c;如果想在 该仪表盘页面 将 用户信息 展示出来&#xff0c;就得想办法先获取到 当前用户的 登陆状态。…

【项目】玩具租赁博客测试报告

目录 一、项目背景 二、项目功能 三、功能测试 一、项目背景 玩具租赁系统采用前后端分离的方法来实现&#xff0c;同时使用了数据库来存储相关的数据&#xff0c;同时将其部署到云服务器上。前端主要有十五个页面构成&#xff1a;用户注册、管理员注册、登录页、用户和管理…

1.倒排索引 2.逻辑斯提回归算法

1.倒排索引 https://help.aliyun.com/zh/open-search/retrieval-engine-edition/introduction-to-inverted-indexes 倒排索引&#xff08;Inverted Index&#xff09;是一种数据结构&#xff0c;用于快速查找包含某个特定词或词语的文档。它主要用于全文搜索引擎等应用&#…

Flask 与微信小程序对接

Flask 与微信小程序的对接 在 web/controllers/api中增建py文件&#xff0c;主要是给微信小程序使用的。 web/controllers/init.py # -*- coding: utf-8 -*- from flask import Blueprint route_api Blueprint( api_page,__name__ )route_api.route("/") def ind…

移动端Vant中的Calendar日历增加显示农历(节日、节气)功能

核心&#xff1a; 使用 js-calendar-converter 库实现 npm地址&#xff1a;js-calendar-converter 内部使用原生calendar.js&#xff0c; 中国农历&#xff08;阴阳历&#xff09;和西元阳历即公历互转JavaScript库&#xff0c;具体实现感兴趣的可自行查看其实现源码。 原日…