SpringCloud Gateway默认不支持将请求路由到一个服务的多个端口
本文将结合Gateway的处理流程,提供一些解决思路
需求背景
公司有一个IM项目,对外暴露了两个端口8081和8082,8081是springboot启动使用的端口,对外提供一些http接口,如获取聊天群组成员列表等,相关配置如下
spring: application: name: im
server:port: 8081servlet:context-path: /im
8082是内部启动了一个netty服务器,处理websocket请求,实现即时通讯
......
......
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
......
......
bootstrap.bind(8082).sync();
以本地环境为例,直连IM项目的请求地址如下:
http://localhost:8081/im/*** (请求http接口)
ws://localhost:8082/ws(进行websocket协议升级)
项目组希望客户端能统一走网关,实现对这两种接口的调用
问题描述
网关使用springcloud gateway,监听端口8080,注册中心使用nacos
正常情况下,gateway根据服务名转发,配置如下
- id: im_routeuri: lb://impredicates:- Path=/im/**
访问网关http://localhost:8080/im/**,即可将请求转发到IM服务
同理,我们希望访问网关ws://localhost:8080/ws,也能转发到IM的netty服务器
一般情况下,如果要通过gateway转发websocket请求,我们需要做如下配置
- id: im_ws_routeuri: lb:ws://impredicates:- Path=/ws/**
但实际上,lb这个schema告诉gateway要走负载,gateway转发的时候,会到nacos拉取im的服务清单,将其替换为 ip + 端口,而nacos上注册的是springboot项目的端口,也就是8081,所以想转发到IM的8082上,默认是没办法利用gateway的服务发现机制的,只能直接配置服务的ip地址,如下
- id: im_ws_routeuri: ws://localhost:8082predicates:- Path=/ws
不过这样也就失去使用网关的意义了
寻找思路
先观察下gateway转发的流程,找到DispatcherHandler的handle方法
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {return handlePreFlight(exchange);}return Flux.fromIterable(this.handlerMappings).concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).onErrorResume(ex -> handleDispatchError(exchange, ex)).flatMap(handler -> handleRequestWith(exchange, handler));}
首先遍历内部的HandlerMapping,依次调用其getHandler方法,寻找能处理当前请求的Handler
HandlerMapping共有四个,一般来说我们配置了上述的路由,会在第三个RoutePredicateHandlerMapping返回一个Handler
Handler类型为FilteringWebHandler,其中包含了一组Filter
找到Handler后,在DispatcherHandler # handle方法的最后一行,调了handleRequestWith方法
private Mono<Void> handleRequestWith(ServerWebExchange exchange, Object handler) {if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {return Mono.empty(); // CORS rejection}if (this.handlerAdapters != null) {for (HandlerAdapter adapter : this.handlerAdapters) {if (adapter.supports(handler)) {return adapter.handle(exchange, handler).flatMap(result -> handleResult(exchange, result));}}}return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));}
然后遍历了HandlerAdapter,一共有四个
这里起作用的是最后一个SimpleHandlerAdapter,进入其handle方法
@Overridepublic Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {WebHandler webHandler = (WebHandler) handler;Mono<Void> mono = webHandler.handle(exchange);return mono.then(Mono.empty());}
调用了上面找到Handler的handle方法
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);List<GatewayFilter> gatewayFilters = route.getFilters();List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}return new DefaultGatewayFilterChain(combined).filter(exchange);}
最后filter方法依次执行了其中的Filter
@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < filters.size()) {GatewayFilter filter = filters.get(this.index);DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);return filter.filter(exchange, chain);}else {return Mono.empty(); // complete}});}}
在这些Filter中,有一个ReactiveLoadBalancerClientFilter,会完成从nacos拉取服务清单替换请求地址的任务
看下它的filter方法
@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// preserve the original urladdOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);}URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String serviceId = requestUri.getHost();Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),RequestDataContext.class, ResponseData.class, ServiceInstance.class);DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(new RequestData(exchange.getRequest()), getHint(serviceId)));return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {if (!response.hasServer()) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());}ServiceInstance retrievedInstance = response.getServer();URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";if (schemePrefix != null) {overrideScheme = url.getScheme();}DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance,overrideScheme);URI requestUrl = reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));}).then(chain.filter(exchange)).doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.FAILED, throwable, lbRequest,exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))).doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(CompletionContext.Status.SUCCESS, lbRequest,exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));}
首先判断uri配置了lb,需要根据服务名做负载转发
在这个filter前,已经根据请求路径/ws和router配置,将请求地址转换成了 服务名:请求路径的形式,也就是将ws://localhost:8080/ws转化成了ws://im/ws
然后根据服务名im到nacos获取到服务的实际地址
ServiceInstance retrievedInstance = response.getServer();
其中包含了ip和端口
最后替换出实际要转发的地址
所以我们只要在这一步,根据需要将端口8081改成8082,就能实现我们要的效果了
最终解决方案
最初是想自定义一个HandlerMapping完成转发的,但看下来后直接改源码更便捷一些,所以直接在项目中新建一个同路径的类,将源码copy进来,覆盖掉这个Filter
然后在获取到nacos实例后,修改掉端口号,就能改变最终的目标地址
在获取服务实例ServiceInstance retrievedInstance = response.getServer()这行后面加一段代码
if (url.toString().equals("ws://im/ws")) {try {Field field = retrievedInstance.getClass().getDeclaredField("port");field.setAccessible(true);field.set(retrievedInstance, 8082);} catch (Exception e) {e.printStackTrace();}
}
im相关的路由配置
- id: im_routeuri: lb://impredicates:- Path=/im/**- id: im_ws_routeuri: lb:ws://impredicates:- Path=/ws/**