1、netty的线程调度问题
在netty的TCP调度中,线程的调度封装在NioEventLoopGroup中,线程执行则封装在NioEventLoop中。
线程调度规则封装在MultithreadEventExecutorGroup的next方法中,这个方法又封装了EventExecutorChooserFactory(实现在DefaultEventExecutorChooserFactory)的next方法。
结果就是轮换着用线程。
如果我们在2个线程上按顺序发送了三个http请求:请求1、请求2、请求3.
其中请求1耗时较长,请求2很快处理完成,我们就希望请求3能由线程2处理,而不是在请求1后面排队。但实际上netty把请求3分配给了线程1:
2、解决方法
为了解决请求被netty线程阻塞的问题,要么定制EventExecutorChooserFactory要么定制NioEventLoopGroup。由于定制EventExecutorChooserFactory入参较复杂,所以我们选择定制NioEventLoopGroup,定制的对象名为CustomEventLoopGroup,代码如下:
/*用来判断线程状态,有的请求响应耗时较长,netty默认的next实现是无脑轮换,
* 那么 有空闲线程也可能选择到正在执行任务的线程 导致原本可以快速返回的请求在慢请求后面等待
* 优先选择空闲线程 如果没有空闲线程则按照原来netty逻辑选择
* 但是客户端应该尽量不同业务使用不同的长连接 */
public class CustomEventLoopGroup extends NioEventLoopGroup {static final Logger logger = Logger.getLogger(CustomEventLoopGroup.class);public CustomEventLoopGroup(int nThreads, Executor executor) {super(nThreads, executor);}@Overridepublic EventLoop next() {EventLoop elNext = null;EventLoop el = super.next(); elNext = el;do { if (elNext instanceof NioEventLoop nioEvent) {logger.debug("pendingTasks: " + nioEvent.pendingTasks());ThreadProperties threadInfo = nioEvent.threadProperties();logger.debug("[" + threadInfo.id() + "] threadInfo.state: " + threadInfo.state());if (threadInfo.state() == Thread.State.RUNNABLE) {return elNext;}}elNext = super.next();if (elNext == null) {return el;}if (elNext.equals(el)) {return el;}} while (true);}
}
服务器使用CustomEventLoopGroup代码如下:
public NettyServerHttp1(int portIn) {port = portIn;EventLoopGroup bossGroup = new NioEventLoopGroup(1);ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);EventLoopGroup workerGroup = new CustomEventLoopGroup(2, executor);try {ServerBootstrap b = new ServerBootstrap();b.option(ChannelOption.SO_BACKLOG, 1024);b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new NettyServerInitializer(null));Channel ch = b.bind(port).sync().channel();ch.closeFuture().sync();} catch (InterruptedException e) {logger.error("Server start err: " + e);logger.error(e.getStackTrace());} finally {logger.info("Server Shutdown!");bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}