Java NIO实现高性能HTTP代理

NIO采用多路复用IO模型,相比传统BIO(阻塞IO),通过轮询机制检测注册的Channel是否有事件发生,可以实现一个线程处理客户端的多个连接,极大提升了并发性能。
在5年前,本人出于对HTTP正向代理的好奇新,那时候也在学习JAVA,了解到了NIO,就想用NIO写一个正向代理软件,当时虽然实现了正向代理,但是代码逻辑及其混乱,而且没有经过测试也许有不少的bug
近期因为找工作,又复习起了以往的一些JAVA知识,包括JVM内存模型、GC垃圾回收机制等等,其中也包括NIO。现在回头再看NIO,理解也更深刻了一点。
在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket 读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。在Java NIO中,是通过selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket 真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。
本HTTP代理软件只能代理HTTP和HTTPS协议,分享出来共广大网友参考和学习
1.Bootstrap类
此类用于创建和启动一个HTTP代理服务
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class Bootstrap {private final Logger logger = LogManager.getLogger(Bootstrap.class);private AbstractEventLoop serverEventLoop;private int port;public Bootstrap() {port = 8888;serverEventLoop = new ServerEventLoop(this);}public Bootstrap bindPort(int port) {try {this.port = port;this.serverEventLoop.bind(port);} catch (Exception e) {logger.error("open server socket channel error.", e);}return this;}public void start() {serverEventLoop.getSelector().wakeup();logger.info("Proxy server started at port {}.", port);}public AbstractEventLoop getServerEventLoop() {return serverEventLoop;}
}
2.ServerEventLoop
事件循环,单线程处理事件循环。包括客户端的连接和读写请求,目标服务器的连接和读写事件,在同一个事件循环中处理。
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.example.common.HttpRequestParser;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class ServerEventLoop extends AbstractEventLoop {private final Logger logger = LogManager.getLogger(ServerEventLoop.class);public ServerEventLoop(Bootstrap bootstrap) {super(bootstrap);}@Overrideprotected void processSelectedKey(SelectionKey key) {if (key.isValid() && key.isAcceptable()) {if (key.attachment() instanceof Acceptor acceptor) {acceptor.accept();}}if (key.isValid() && key.isReadable()) {if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleRead();}}if (key.isValid() && key.isConnectable()) {key.interestOpsAnd(~SelectionKey.OP_CONNECT);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleConnect();}}if (key.isValid() && key.isWritable()) {key.interestOpsAnd(~SelectionKey.OP_WRITE);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleWrite();}}}@Overridepublic void bind(int port) throws Exception {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);SelectionKey key = serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);key.attach(new Acceptor(serverSocketChannel));serverSocketChannel.bind(new InetSocketAddress(port));}class Acceptor {ServerSocketChannel ssc;public Acceptor(ServerSocketChannel ssc) {this.ssc = ssc;}public void accept() {try {SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ, new ClientChannelHandler(socketChannel));logger.info("accept client connection");} catch (IOException e) {logger.error("accept error");}}}abstract class ChannelHandler {Logger logger;SocketChannel channel;ByteBuffer writeBuffer;public ChannelHandler(SocketChannel channel) {this.logger = LogManager.getLogger(this.getClass());this.channel = channel;this.writeBuffer = null;}abstract void handleRead();public void handleWrite() {doWrite();}public abstract void onChannelClose();public ByteBuffer doRead() {ByteBuffer buffer = ByteBuffer.allocate(4096);try {int len = channel.read(buffer);if (len == -1) {logger.info("read end-of-stream, close channel {}", channel);channel.close();onChannelClose();}if (len > 0) {buffer.flip();}} catch (IOException e) {logger.error("read channel error");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}}return buffer;}public void doWrite() {if (writeBuffer != null) {try {while (writeBuffer.hasRemaining()) {channel.write(writeBuffer);}} catch (IOException e) {logger.error("write channel error.");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error");}}writeBuffer = null;}}public void handleConnect() {}}class ClientChannelHandler extends ChannelHandler {HttpRequestParser requestParser;private SelectableChannel proxyChannel;public ClientChannelHandler(SocketChannel sc) {super(sc);this.channel = sc;this.requestParser = new HttpRequestParser();this.proxyChannel = null;}@Overridepublic void handleRead() {if (requestParser.isParsed()) {if (proxyChannel != null) {SelectionKey proxyKey = proxyChannel.keyFor(selector);if (proxyKey != null && proxyKey.isValid() && proxyKey.attachment() instanceof ProxyChannelHandler proxyHandler) {//需要等待ProxyHandler的写入缓存为空后才可读取客户端的数据if (proxyHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && proxyKey.isValid()) {proxyHandler.writeBuffer = buffer;proxyKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}} else {ByteBuffer buffer = doRead();requestParser.putFromByteBuffer(buffer);if (requestParser.isParsed()) {//连接到目标服务器ByteBuffer buf = null;if (requestParser.getMethod().equals(HttpRequestParser.HTTP_METHOD_CONNECT)) {//回写客户端连接成功SelectionKey clientKey = channel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {clientHandler.writeBuffer = ByteBuffer.wrap((requestParser.getProtocol() + " 200 Connection Established\r\n\r\n").getBytes());clientKey.interestOpsOr(SelectionKey.OP_WRITE);}} else {//将缓存的客户端的数据通过代理转发byte[] allBytes = requestParser.getAllBytes();buf = ByteBuffer.wrap(allBytes);}this.proxyChannel = connect(requestParser.getAddress(), buf);}}}@Overridepublic void onChannelClose() {try {if (proxyChannel != null) {proxyChannel.close();}} catch (IOException e) {logger.error("close channel error");}}private SocketChannel connect(String address, ByteBuffer buffer) {String host = address;int port = 80;if (address.contains(":")) {host = address.split(":")[0].trim();port = Integer.parseInt(address.split(":")[1].trim());}SocketAddress target = new InetSocketAddress(host, port);SocketChannel socketChannel = null;SelectionKey proxyKey = null;int step = 0;try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);step = 1;ProxyChannelHandler proxyHandler = new ProxyChannelHandler(socketChannel);proxyHandler.setClientChannel(channel);proxyHandler.writeBuffer = buffer;proxyKey = socketChannel.register(selector, SelectionKey.OP_CONNECT, proxyHandler);proxyKey.interestOpsOr(SelectionKey.OP_WRITE);step = 2;socketChannel.connect(target);} catch (IOException e) {logger.error("connect error.");switch (step) {case 2:proxyKey.cancel();case 1:try {socketChannel.close();} catch (IOException ex) {logger.error("close channel error.");}socketChannel = null;break;}}return socketChannel;}}class ProxyChannelHandler extends ChannelHandler {private SelectableChannel clientChannel;public ProxyChannelHandler(SocketChannel sc) {super(sc);clientChannel = null;}@Overridepublic void handleConnect() {try {if (channel.isConnectionPending() && channel.finishConnect()) {SelectionKey proxyKey = channel.keyFor(selector);proxyKey.interestOpsOr(SelectionKey.OP_READ);}} catch (IOException e) {try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}logger.error("finish connection error.");}}@Overridepublic void handleRead() {if (clientChannel != null) {SelectionKey clientKey = clientChannel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {if (clientHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && clientKey.isValid()) {clientHandler.writeBuffer = buffer;clientKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}}@Overridepublic void onChannelClose() {try {if (clientChannel != null) {clientChannel.close();}} catch (IOException e) {logger.error("close channel error");}}public void setClientChannel(SocketChannel client) {this.clientChannel = client;}}
}
3.AbstractEventLoop
事件循环的抽象类
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;public abstract class AbstractEventLoop implements Runnable {private final Logger logger = LogManager.getLogger(AbstractEventLoop.class);protected Selector selector;protected Bootstrap bootstrap;public AbstractEventLoop(Bootstrap bootstrap) {this.bootstrap = bootstrap;openSelector();Executors.newSingleThreadExecutor().submit(this);}public void bind(int port) throws Exception {throw new Exception("not support");}@Overridepublic void run() {while (true) {try {if (selector.select() > 0) {processSelectedKeys();}} catch (Exception e) {logger.error("select error.", e);}}}private void processSelectedKeys() {Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();processSelectedKey(key);}}protected abstract void processSelectedKey(SelectionKey key);public Selector openSelector() {try {this.selector = Selector.open();return this.selector;} catch (IOException e) {logger.error("open selector error.", e);}return null;}public Selector getSelector() {return selector;}
}
4.HttpRequestParser
用于解析HTTP请求报文中的请求头,可以获取主机和端口号
package org.example.common;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class HttpRequestParser {private final Logger logger = LogManager.getLogger(HttpRequestParser.class);public static final String COLON = ":";public static final String REQUEST_HEADER_HOST_PREFIX = "host:";private UnboundedByteBuffer requestBytes = new UnboundedByteBuffer();private List<String> headers = new ArrayList<>();public static final String HTTP_METHOD_GET = "GET";public static final String HTTP_METHOD_POST = "POST";public static final String HTTP_METHOD_PUT = "PUT";public static final String HTTP_METHOD_DELETE = "DELETE";public static final String HTTP_METHOD_TRACE = "TRACE";public static final String HTTP_METHOD_OPTIONS = "OPTIONS";public static final String HTTP_METHOD_HEAD = "HEAD";public static final String HTTP_METHOD_CONNECT = "CONNECT";private String address;private String protocol;private String method;private boolean parsed = false;private StringBuffer reqHeaderBuffer = new StringBuffer();public void putFromByteBuffer(ByteBuffer buffer) {for (; buffer.hasRemaining(); ) {byte b = buffer.get();requestBytes.addByte(b);reqHeaderBuffer.append((char) b);if (b == '\n' && reqHeaderBuffer.charAt(reqHeaderBuffer.length() - 2) == '\r') {if (reqHeaderBuffer.length() == 2) {parsed = true;logger.debug("Request header line end.");break;}String headerLine = reqHeaderBuffer.substring(0, reqHeaderBuffer.length() - 2);logger.debug("Request header line parsed {}", headerLine);headers.add(headerLine);if (headerLine.startsWith(HTTP_METHOD_GET)|| headerLine.startsWith(HTTP_METHOD_POST)|| headerLine.startsWith(HTTP_METHOD_PUT)|| headerLine.startsWith(HTTP_METHOD_DELETE)|| headerLine.startsWith(HTTP_METHOD_TRACE)|| headerLine.startsWith(HTTP_METHOD_OPTIONS)|| headerLine.startsWith(HTTP_METHOD_HEAD)|| headerLine.startsWith(HTTP_METHOD_CONNECT)) {this.protocol = headerLine.split(" ")[2].trim();this.method = headerLine.split(" ")[0].trim();} else if (headerLine.toLowerCase().startsWith(REQUEST_HEADER_HOST_PREFIX)) {this.address = headerLine.toLowerCase().replace(REQUEST_HEADER_HOST_PREFIX, "").trim();}reqHeaderBuffer.delete(0, reqHeaderBuffer.length());}}}public boolean isParsed() {return parsed;}public String getAddress() {return address;}public String getProtocol() {return protocol;}public String getMethod() {return method;}public byte[] getAllBytes() {return requestBytes.toByteArray();}
}
5.UnboundedByteBuffer
无界的字节缓冲区,每次会以两倍的容量扩容,可以用于追加存入客户端的请求数据,实现粘包
package org.example.common;public class UnboundedByteBuffer {private byte[] bytes;private int size;private int cap;private final int DEFAULT_CAP = 4096;private final int MAX_CAP = 1 << 30;public UnboundedByteBuffer() {this.cap = DEFAULT_CAP;this.bytes = new byte[this.cap];this.size = 0;}public void addBytes(byte[] data) {ensureCapacity(data.length);System.arraycopy(data, 0, bytes, size, data.length);this.size += data.length;}private void ensureCapacity(int scale) {if (scale + this.size > this.cap) {int tmpCap = this.cap;while (scale + this.size > tmpCap) {tmpCap = tmpCap << 1;}if (tmpCap > MAX_CAP) {return;}byte[] newBytes = new byte[tmpCap];System.arraycopy(this.bytes, 0, newBytes, 0, this.size);this.bytes = newBytes;}}public byte[] toByteArray() {byte[] ret = new byte[this.size];System.arraycopy(this.bytes, 0, ret, 0, this.size);return ret;}public void addByte(byte b) {ensureCapacity(1);this.bytes[this.size++] = b;}
}
以上实现是在单个事件循环线程中处理所有事件,一个更好的方案是将客户端的Channel和代理服务器与目标服务器的Channel区分开,分别在两个事件循环中处理。基本实现也和本文中的代码大体一致,两者在理论上应该存在性能差距,实际经过本人测试可以每秒处理客户端的上千个连接。代码传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/469110.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机课程管理:Spring Boot与工程认证的协同创新

3系统分析 3.1可行性分析 通过对本基于工程教育认证的计算机课程管理平台实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于工程教育认证的计算机课程管理平…

<项目代码>YOLOv8 苹果腐烂识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

游戏引擎学习第四天

视频参考:https://www.bilibili.com/video/BV1aDmqYnEnc/ BitBlt 是 Windows GDI&#xff08;图形设备接口&#xff09;中的一个函数&#xff0c;用于在设备上下文&#xff08;device context, DC&#xff09;之间复制位图数据。BitBlt 的主要用途是将一个图像区域从一个地方复…

SPIRE: Semantic Prompt-Driven Image Restoration 论文阅读笔记

这是一篇港科大学生在google research 实习期间发在ECCV2024的语义引导生成式修复的文章&#xff0c;港科大陈启峰也挂了名字。从首页图看效果确实很惊艳&#xff0c;尤其是第三行能用文本调控修复结果牌上的字。不过看起来更倾向于生成&#xff0c;对原图内容并不是很复原&…

如何平滑切换Containerd数据目录

如何平滑切换Containerd数据目录 大家好&#xff0c;我是秋意零。 这是工作中遇到的一个问题。搭建的服务平台&#xff0c;在使用的过程中频繁出现镜像本地拉取不到问题&#xff08;在项目群聊中老是被人出来&#x1f605;&#xff09;原因是由于/目录空间不足导致&#xff0…

Sharding运行模式、元数据、持久化详解

运行模式 单机模式 能够将数据源和规则等元数据信息持久化&#xff0c;但无法将元数据同步至多个Sharding实例&#xff0c;无法在集群环境中相互感知。 通过某一实例更新元数据之后&#xff0c;会导致其他实例由于获取不到最新的元数据而产生不一致的错误。 适用于工程师在本…

基于springboot+小程序的鲜花管理系统(鲜花1)

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 本网上花店微信小程序分为管理员还有用户两个权限&#xff0c;管理员可以管理用户的基本信息内容&#xff0c;可以管理公告信息以及鲜花信息&#xff0c;能够与用户进行相互交流等操作&am…

金融学期末速成笔记

【拯救者】金融学速成&#xff08;基础习题&#xff09; 重点: 市场经济是发达的商品经济。在市场经济条件下&#xff0c;市场机制作为资源配置方式&#xff0c;发挥基础性作用。 除具有商品经济的一般特征外&#xff0c;与商品经济相比&#xff0c;市场经济还具有一些新的特征…

后悔没早点知道,Coze 插件 + Cursor 原来可以这样赚钱

最近智能体定制化赛道异常火爆。 打开闲鱼搜索"Coze 定制",密密麻麻的服务报价直接刷屏,即使表明看起来几十块的商家,一细聊,都是几百到上千不等的报价。 有趣的是,这些智能体定制化服务背后,最核心的不只是工作流设计,还有一个被很多人忽视的重要角色 —— …

嵌入式采集网关(golang版本)

为了一次编写到处运行&#xff0c;使用纯GO编写&#xff0c;排除CGO&#xff0c;解决在嵌入式中交叉编译难问题 硬件设备&#xff1a;移远EC200A-CN LTE Cat 4 无线通信模块&#xff0c;搭载openwrt操作系统&#xff0c;90M内存

基于Multisim数字电子秒表0-60S电路(含仿真和报告)

【全套资料.zip】数字电子秒表电路Multisim仿真设计数字电子技术 文章目录 功能一、Multisim仿真源文件二、原理文档报告资料下载【Multisim仿真报告讲解视频.zip】 功能 1.秒表最大计时值为60秒&#xff1b; 2. 2位数码管显示&#xff0c;分辨率为1秒&#xff1b; 3.具有清零…

昇思大模型平台打卡体验活动:项目2基于MindSpore通过GPT实现情感分类

昇思大模型平台打卡体验活动&#xff1a;项目2基于MindSpore通过GPT实现情感分类 1. 载入与处理数据集 在情感分类任务中&#xff0c;我们使用了IMDB数据集&#xff0c;首先需要对数据进行加载和处理。由于原数据集没有验证集&#xff0c;我们将训练集重新划分为训练集和验证…

Mac如何实现最简单的随时监测实时运行状态的方法

Mac book有着不同于Windows的设计逻辑与交互设计&#xff0c;使得Mac book有着非常棒的使用体验&#xff0c;但是在Mac电脑的使用时间过长时&#xff0c;电脑也会出现响应速度变慢或应用程序崩溃的情况&#xff0c;当发生的时候却不知道什么原因导致的&#xff0c;想要查询电脑…

有趣的Midjourney作品赏析(附提示词)

中文提示词&#xff1a;国风少年 C4D软件,高分辨率,超细节,超现实主义, 英文提示词&#xff1a;National Style Youth Cinema4D,high resolution,hyper detailed,surrealism, --niji 6 --ar 1:1 中文提示词&#xff1a;粘土模型&#xff0c;男性穿着中世纪欧洲蓝色盔甲&#x…

时序预测 | gamma伽马模型锂电池寿命预测 EM算法粒子滤波算法结合参数估计

时序预测 | gamma伽马模型锂电池寿命预测 EM算法粒子滤波算法结合参数估计 目录 时序预测 | gamma伽马模型锂电池寿命预测 EM算法粒子滤波算法结合参数估计预测效果基本介绍参考资料 预测效果 基本介绍 gamma伽马模型锂电池寿命预测 EM算法粒子滤波算法结合参数估计 伽马模型、…

男同事36岁,听说被裁拿了12万。今天看到他退了群,但下午领导就反悔了,让他回来,还要把12万补偿退回来

亲爱的读者们&#xff0c;今天咱们来聊聊职场那些事儿。你听说过吗&#xff1f;有位男同事&#xff0c;36岁&#xff0c;被裁了&#xff0c;拿了12万补偿金&#xff0c;然后退了群。你以为这就是结局&#xff1f;不&#xff0c;故事才刚刚开始&#xff01; 想象一下&#xff0…

李佳琦回到巅峰背后,双11成直播电商分水岭

时间倏忽而过&#xff0c;又一年的双11即将宣告结束。 从双11正式开始前的《新所有女生的offer》&#xff0c;到被作为“比价”标杆被其他平台直播间蹭、被与其他渠道品牌比较&#xff0c;再到直播间运营一时手快多发了红包……整个双11周期下来&#xff0c;李佳琦直播间在刷新…

Golang | Leetcode Golang题解之第546题移除盒子

题目&#xff1a; 题解&#xff1a; func removeBoxes(boxes []int) int {dp : [100][100][100]int{}var calculatePoints func(boxes []int, l, r, k int) intcalculatePoints func(boxes []int, l, r, k int) int {if l > r {return 0}if dp[l][r][k] 0 {r1, k1 : r, k…

Python | Leetcode Python题解之第553题最优除法

题目&#xff1b; 题解&#xff1a; class Solution:def optimalDivision(self, nums: List[int]) -> str:if len(nums) 1:return str(nums[0])if len(nums) 2:return str(nums[0]) "/" str(nums[1])return str(nums[0]) "/(" "/".joi…

新手 Vue 项目运行

前言&#xff1a;前面讲了我们已经将spingboot项目运行起来了&#xff0c;现在我们只需将后台管理的Vue项目运行起来即可完成整个项目。 在运行vue项目之前&#xff0c;请先运行springboot项目&#xff0c;运行步骤请看&#xff1a;运行Springboot Vue 项目_springbootvue项目…