20. 从零用Rust编写正反向代理,四层反向代理stream(tcp与udp)实现

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

四层代理

四层代理,也称为网络层代理,是基于IP地址和端口号的代理方式。它只关心数据包的源IP地址、目的IP地址、源端口号和目的端口号,不关心数据包的具体内容。四层代理主要通过报文中的目标地址和端口,再加上负载均衡设备设置的服务器选择方式,决定最终选择的内部服务器。

因为四层代理不用处理任何相关的包信息,只需将包数据传递给正确的服务器即可,所以实现相对比较简单。

以下是OSI七层模型的示意图,来源于网上

图片.png

实现方式

双端建立连接,也就是收到客户端的连接的时候,同时建立一条通往服务端的连接,然后做双向绑定即可完成服务。

四层代理还有udp的转发需求,需要同步将udp的数据进行转发,udp的处理方式处理会相对复杂一些,因为当前地址只有绑定一份,但是可能来自各种不同的地址,不同的客户端的(remote_ip, remote_port)我们需要当成一个全新的客户端。

而且有时候无法主动感知是否已经被断开了,所以也必须有超时机制,好在超时的时候能及时释放掉连接,好让系统及时的socket资源。

TCP实现

tcp找到相应的地址,连接,并双向绑定即可

pub async fn process<T>(data: Arc<Mutex<StreamConfig>>,local_addr: SocketAddr,mut inbound: T,_addr: SocketAddr,
) -> ProxyResult<()>
whereT: AsyncRead + AsyncWrite + Unpin + std::marker::Send + 'static,
{let value = data.lock().await;for (_, s) in value.server.iter().enumerate() {if s.bind_addr.port() == local_addr.port() {let addr = ReverseHelper::get_upstream_addr(&s.upstream, "")?;let mut connect = HealthCheck::connect(&addr).await?;copy_bidirectional(&mut inbound, &mut connect).await?;break;}}Ok(())
}
UDP实现

UDP相对比较复杂,下面我们先列举内部的流程图

根据地址连接发送数据到
将Receiver传到以接收数据
否,将数据Sender给
异步读取数据并发送
绑定反向udp端口
客户端
是否第一次
创建异步协程
异步协程中

在stream绑定的时候,要区分出TCP还是UDP的,做分别的绑定

/// stream的绑定,按bind_mode区分出udp或者是tcp,返回相应的列表
pub async fn bind(&mut self) -> ProxyResult<(Vec<TcpListener>, Vec<StreamUdp>)> {let mut listeners = vec![];let mut udp_listeners = vec![];let mut bind_port = HashSet::new();for value in &self.server.clone() {if bind_port.contains(&value.bind_addr.port()) {continue;}bind_port.insert(value.bind_addr.port());if value.bind_mode == "udp" {let listener = Helper::bind_upd(value.bind_addr).await?;udp_listeners.push(StreamUdp::new(listener, value.clone()));} else {let listener = Helper::bind(value.bind_addr).await?;listeners.push(listener);}}Ok((listeners, udp_listeners))
}

我们会对连接做分别的监听,下面是udp的获取是否有新数据:

async fn multi_udp_listen_work(listens: &mut Vec<StreamUdp>,
) -> (io::Result<(Vec<u8>, SocketAddr)>, usize) {if !listens.is_empty() {let (data, index, _) =select_all(listens.iter_mut().map(|listener| {listener.next().boxed()})).await;if data.is_none() {return (Err(io::Error::new(io::ErrorKind::InvalidInput, "read none data")), index)}(data.unwrap(), index)} else {let pend = std::future::pending();let () = pend.await;unreachable!()}
}

此处我们用next,也就是我们实现了 futures_core::Stream接口,用Poll的方式来注册实现有事件的时候来通知。

在tokio中,在read或者write的时候返回Poll::Pending,将会将socket的可读可写注册到底层,如果一旦系统可读可写就会通知该接口,将会重新执行一遍futures_core::Stream

我们将同时可以处理可读可写可发送事件,如果接口超时我们将关闭相应的接口。

impl Stream for StreamUdp {type Item = io::Result<(Vec<u8>, SocketAddr)>;fn poll_next(mut self: std::pin::Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> std::task::Poll<Option<Self::Item>> {let _ = self.poll_write(cx)?;let _ = self.poll_sender(cx)?;self.poll_read(cx)}
}

下面是主要的StreamUdp

/// Udp转发的处理结构,缓存一些数值以做中转
pub struct StreamUdp {/// 读的缓冲类,避免每次都释放pub buf: BinaryMut,/// 核心的udp绑定端口pub socket: UdpSocket,pub server: ServerConfig,/// 如果接收该数据大小为0,那么则代表通知数据关闭pub receiver: Receiver<(Vec<u8>, SocketAddr)>,/// 将发送器传达给每个子协程pub sender: Sender<(Vec<u8>, SocketAddr)>,/// 接收的缓存数据,无法保证全部直接进行发送完毕pub cache_data: LinkedList<(Vec<u8>, SocketAddr)>,/// 发送的缓存数据,无法保证全部直接进行发送完毕pub send_cache_data: LinkedList<(Vec<u8>, SocketAddr)>,/// 每个地址绑定的对象,包含Sender,最后操作时间,超时时间remote_sockets: HashMap<SocketAddr, InnerUdp>,
}

结果测试

我们自己开一个udp服务端,绑定了本地的8089,我们将接收到的数据前面加上from server:并进行返回,代理端我们绑定了84的端口,并将udp数据转发给8089端:

use tokio::net::UdpSocket;
use std::io;#[tokio::main]
async fn main() -> io::Result<()> {let sock = UdpSocket::bind("0.0.0.0:8089").await?;let mut buf = [0; 1024];loop {let (len, addr) = sock.recv_from(&mut buf).await?;let mut vec = "from server: ".as_bytes().to_vec();vec.extend(&buf[..len]);let _ = sock.send_to(&vec, addr).await?;}
}

客户端我们用nc运行:

图片.png

可以看出两个客户端互相独立,彼此返回的数据均符合预期,正常的接收及返回。

TCP我们绑定了83端口并转发到HTTP的本地端口8080,我们用curl进行测试,符合预期,如图:

图片.png

结语

至此四层的反向代理TCP/UDP均已完成,也符合预期。

点击 [关注][在看][点赞] 是对作者最大的支持

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/239023.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 下GEO Server发布图层后,中文乱码解决方案

发布的图层&#xff0c;显示中文乱码&#xff0c;都是框框&#xff1a;如“口口” 第一步先查看Linux字符集 如下命令所示&#xff1a; 1.查看当前系统语言 echo $LANG2.查看安装的语言包 locale如果上面的命令执行后显示的是en_US.UTF-8&#xff0c;则说明当前语言系统及安…

创意交融:集成自定义报表和仪表盘设计器,实现图标替换

前言 在现代数据分析领域&#xff0c;随着对报表和数据分析的需求不断增长&#xff0c;市场上涌现了许多嵌入式报表工具。这些工具能够与企业现有的OA、ERP、MES、CRM等应用系统深度集成&#xff0c;实现对业务数据的自助式分析。然而&#xff0c;在实际应用中&#xff0c;不同…

实战 php 使用 wkhtmltopdf 生成pdf的全过程

公司里边有生成pdf报告的业务需求,之前有过尝试用tcpdf,直接生成的pdf的过程,但是pdf报告的内容数据,根据不同内容的变化,都是各种各样的bug,一直处理修修补补的状态,让后台开发人员很是头疼. 经过思索和甄选,总结出我们的业务中是由于样式不可控导致的,当时从逻辑上就思考到用…

QT quick基础:组件gridview

组件gridview与android中gridview布局效果相同。下面记录qt quick该组件的使用方法。 方法一&#xff1a; // ContactModel.qml import QtQuick 2.0ListModel {ListElement {name: "1"portrait: "icons/ic_find.png"}ListElement {name: "2"por…

【iOS】数据存储方式总结(持久化)沙盒结构

在iOS开发中&#xff0c;我们经常性地需要存储一些状态和数据&#xff0c;比如用户对于App的相关设置、需要在本地缓存的数据等等&#xff0c;本篇文章将介绍六个主要的数据存储方式 iOS中数据存储方式&#xff08;数据持久化&#xff09; 根据要存储的数据大小、存储数据以及…

scrollTop与offsetTop解决小分辨率区域块向上滚动效果效果,结合animation与@keyframes实现标题左右闪动更换颜色效果。

scrollTop 是一个属性&#xff0c;它表示元素的滚动内容垂直滚动条的位置。对于可滚动元素&#xff0c;scrollTop 属性返回垂直滚动条滚动的像素数&#xff0c;即元素顶部被隐藏的像素数。 offsetTop 是一个属性&#xff0c;用于获取一个元素相对于其父元素的垂直偏移量&…

bee工具的使用及创建第一个项目

前提文章&#xff1a;beego的安装及配置参数说明-CSDN博客 提示&#xff1a;beego框架下项目需要再GOPATH/src下进行开发&#xff0c;我的GOPATH是C:\Users\leell\go 一、web项目创建 通过 bee new 创建web项目 C:\Users\leell\go\src>bee new beego-web 2024/01/15 21:…

【新】Unity Meta Quest MR 开发(一):Passthrough 透视配置

文章目录 &#x1f4d5;教程说明&#x1f4d5;配置透视的串流调试功能&#x1f4d5;第一步&#xff1a;设置 OVRManager&#x1f4d5;第二步&#xff1a;添加 OVRPassthroughLayer 脚本&#x1f4d5;第三步&#xff1a;在场景中添加虚拟物体&#x1f4d5;第四步&#xff1a;设置…

C++系统笔记教程----vscode远程连接ssh

C系统笔记教程 文章目录 C系统笔记教程前言开发环境配置总结 前言 开发环境配置 Ubuntu20.24VScode 如果没有linux系统&#xff0c;但是想用其编译&#xff0c;可以使用ssh远程连接。 首先进入vscode,打开远程连接窗口&#xff08;蓝色的小箭头这&#xff09; 选择连接到主机…

K8S 存储卷

意义&#xff1a;存储卷----数据卷 容器内的目录和宿主机的目录进行挂载 容器在系统上的生命周期是短暂的&#xff0c;delete,k8s用控制器创建的pod&#xff0c;delete相当于重启&#xff0c;容器的状态也会回复到初始状态 一旦回到初始状态&#xff0c;所有的后天编辑的文件…

什么是云服务器?云服务器的工作原理是介绍

阿里云服务器ECS英文全程Elastic Compute Service&#xff0c;云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务&#xff0c;阿里云提供多种云服务器ECS实例规格&#xff0c;如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等&#xff0c;阿里云百科aliyunbai…

【教3妹学编程-算法题】3008. 找出数组中的美丽下标 II

3妹&#xff1a;呜呜&#xff0c;烦死了&#xff0c; 脸上长了一个痘 2哥 : 不要在意这些细节嘛&#xff0c;不用管它&#xff0c;过两天自然不就好了。 3妹&#xff1a;切&#xff0c;你不懂&#xff0c;影响这两天的心情哇。 2哥 : 我看你是不急着找工作了啊&#xff0c; 工作…

rke2 Offline Deploy Rancher v2.8.0 latest (helm 离线部署 rancher v2.8.0)

文章目录 1. 预备条件2. 为什么是三个节点&#xff1f;​3. 配置私有仓库4. 介质清单5. 安装 helm6. 安装 cert-manager6.1 下载介质6.2 镜像入库6.3 helm 部署6.4 cert-manager 卸载 7. 安装 rancher7.1 镜像入库7.2 helm 安装 8. 验证9. 界面预览10. 卸载 1. 预备条件 所有支…

k8s中的基础概念

k8s可以从硬件和软件两方面来理解&#xff1a; 硬件&#xff1a; 1、节点&#xff08;Node&#xff09;&#xff1a;类似于手机、平板、电脑 2、集群&#xff08;Cluster&#xff09;&#xff1a;多个节点组合到一起 3、持久卷&#xff08;Persistent Volumes&#xff09;&…

Ubuntu20.04安装配置OpenCV-Python库并首次执行读图

一、选择三方提供的预编译包安装&#xff1a; 可以从官网下载 OpenCV 的安装包&#xff0c;编译后使用&#xff1b;也可以直接使用第三方提供的预编译包 安装。显然后者不需要执行编译步骤&#xff0c;更便捷。选择由 PyPI 提供的 OpenCV 安装包&#xff0c;可以在 https://py…

k8s源码阅读环境配置

源码阅读环境配置 k8s代码的阅读可以让我们更加深刻的理解k8s各组件的工作原理&#xff0c;同时提升我们Go编程能力。 IDE使用Goland&#xff0c;代码阅读环境需要进行如下配置&#xff1a; 从github上下载代码&#xff1a;https://github.com/kubernetes/kubernetes在GOPATH目…

git切换到另一分支更改也会随之过去

一次的修改如果没有 commit如果切换到另一分支就会把修改带到另一个分支 这时可以使用 git stash 其他使用场景 切换分支&#xff1a;当正在一个分支上工作&#xff0c;但需要临时切换到另一个分支处理一些紧急任务时&#xff0c;可以使用 git stash 保存当前的工作进度。完成…

【GitHub】如何上传文件夹到GitHub上(配图详解)

一、如果没有账号要先创建账号&#xff08;有账号跳过此步骤&#xff09;二、建立一个仓库&#xff08;有仓库跳过此步骤&#xff09;三、复制仓库地址四、以下为本地操作 1、在本地新建一个空文件夹2、上传文件 2.1、在空文件夹内&#xff0c;右键选择Git Bash Here2.2、弹出G…

Python - 深夜数据结构与算法之 LRUCache

目录 一.引言 二.LRU Cache 简介 1.实现特性 2.工作流程 三.LRU Cache 实战 1.HashMap ListNode 2.OrderedDict 四.总结 一.引言 LRU 即 Least Recently Used 意为最近使用&#xff0c;它是一种局部 Cache 的缓存方法&#xff0c;用于存储最近使用的元素&#xff0c;…

记一个有关 Vuetify 组件遇到的一些问题

Vuetify 官网地址 所有Vuetify 组件 — Vuetify 1、Combobox使用对象数组 Combobox 组合框 — Vuetify items数据使用对象数组时&#xff0c;默认选中的是整个对象&#xff0c;要对数据进行处理 <v-comboboxv-model"defaultInfo.variableKey":rules"rules…