linux 获取公网流量 tcpdump + python + C++

前言

需求为,统计linux上得上下行公网流量,常规得命令如iftop 、sar、ifstat、nload等只能获取流量得大小,不能区分公私网,所以需要通过抓取网络包并排除私网段才能拿到公网流量。下面提供了一些有效得解决思路,提供了部分得代码片段,但不提供整个代码内容。

方案

方案一:在Linux服务器上安装服务,抓取到数据包后发送到中央计算服务器

优点:①由于计算是在中央,并不会占用linux业务服务器得资源性能

缺点:①通过scp或其他传输手段,会占用一定得网络资源,特别当数据量大时

②中央计算服务器得资源易达到瓶颈,需要开多个计算节点

方案二:在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央

优点:①中央节点压力减小,数据实时性提升

缺点:①每个linux服务器都需要抓包并且处理,需要占用一定的计算资源,且单位包越大消耗越高

方案三:在同层交换机上放置旁路监控服务器,抓取广播域内所有包,逐个分析每个linux服务器上的流量情况

优点:

①独立部署,不会占用linux服务器资源

②维护相对容易,linux业务服务器与监控节点解耦,只需要维护监控节点而不需要维护linux服务器

缺点:

①需要跟linux服务器处在一个广播域,并且要求性能足够,当linux业务服务器数据过多时,易有瓶颈

②可能存在漏抓包的情况

③需要监听所有得vlan,并分别部署监控节点

方案四:加入流控设备,旁路到核心网关

优点:

①无需自研,有成熟的抓取功能,且有更多丰富的功能

缺点:

①成本较高

②缺乏控制能力,比如流量限速,阻断等

下面主要介绍第一和第二种方法得实现

方案一技术实现

先来看技术拓扑图,流程为:

  1. linux服务器开启tcpdump抓包
  2. linux服务器将pcap包发送到流量分析服务器
  3. 流量服务器进行公私网流量拆分,公网白名单过滤
  4. 获取步骤三得结果并传输到中央

下面讲解每一步得实现细节

1.linux服务器开启tcpdump抓包

核心命令为"tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p'

-i 接网卡名,host接希望抓取得ip(可不填),-s 96 表示只抓取包头部(可能有得包头超过这个长度,但96依然是个比较适中得值,-w 保存为pcap包,-p 关闭混杂模式(避免监听到广播得其他包使流量过多)

#执行tcpdump抓包
def run_tcpdump(target_ip):INTERVAL = 60interface = getInterface() #获取网卡名timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") #获取开始时间output_file = f"{SAVEPATH}/xxx_{timestamp}.pcap"tcpdump_command = ["tcpdump", "-i", interface , f"host {target_ip}", '-s 96',"-w", output_file ,'-p']try:process = subprocess.Popen(tcpdump_command) #使用subprocess执行shell命令time.sleep(INTERVAL) #定义休眠时间process.terminate()  #结束shell命令process.wait()       #等待子进程退出,确保子进程清理完成except Exception as e:logger.error(f"xxx")

2.linux服务器将pcap包发送到流量分析服务器

这一个步骤没有太多可讲得,使用socket 或requests 或 subprocess(scp)都可以,目的就是将抓取到得包发送到流量分析服务器。

3.流量服务器进行公私网流量拆分,公网白名单过滤

这一步是关键步骤,公私网流量,公网白名单过滤拆分需要一定得计算性能,所以核心代码推荐用C++语言编写,博主测试过同代码情况下,python C++ java得表现能力,C++(或C)性能要远超其他语言,如同体积包,假设占用30%得单核心性能,C++可以做到1%以内。

代码要实现两点:

1.公私网彻底拆分

2.过滤公网白名单内的(不希望统计得)数据

公私网流量拆分参考以下代码片段,首先剔除三类子网地址(0xFF000000 是掩码(255.0.0.0 的二进制形式),只保留 IP 地址的最高 8 位。0x0A000000 是 10.0.0.0 的二进制形式。)

注意:现在得大型网络多采用overlay得方式,有些ip虽然在私网段,但由于走了vxlan隧道占用了公网带宽,其实也是数据公网得。后续得处理需读者自行处理。

bool is_private_ip(const std::string &ip) {struct in_addr addr;inet_pton(AF_INET, ip.c_str(), &addr);uint32_t ip_addr = ntohl(addr.s_addr);// 10.0.0.0/8if ((ip_addr & 0xFF000000) == 0x0A000000) return true;// 172.16.0.0/12if ((ip_addr & 0xFFF00000) == 0xAC100000) return true;// 192.168.0.0/16if ((ip_addr & 0xFFFF0000) == 0xC0A80000) return true;return false;
}

定义4个变量分别存公网下行,公网上行,私网下行,私网上行

    uint64_t public_in= 0;uint64_t public_out = 0;uint64_t private_in = 0;uint64_t private_out = 0;  pcap_t *handle = pcap_open_offline(pcap_file.c_str(), errbuf); #获取pcap包pcap_next_ex(handle, &header, &data)  #调用libpcap 捕获库 const struct ip *ip_header = (struct ip *)(data + sizeof(struct ether_header)); #获取包得headstd::string src_ip = inet_ntoa(ip_header->ip_src); #获取src ipstd::string dst_ip = inet_ntoa(ip_header->ip_dst); #获取dst ip#判断是私网还是公网if (dst_ip == target_ip) {if (is_private_ip(src_ip)) {private_in += pkt_len;} else {public_in += pkt_len;}} else if (src_ip == target_ip) {if (is_private_ip(dst_ip)) {private_out += pkt_len;} else {public_out += pkt_len;}}

接下来是过滤不希望统计的ip名单,在上面代码的基础上再做一层判断即可,

    #判断是私网还是公网if (dst_ip == target_ip) {if (is_private_ip(src_ip)) {private_in += pkt_len;} else {if (filter_write(src_ip)){  #return true则公网ip不在白名单内,需要加和public_in += pkt_len;}}} else if (src_ip == target_ip) {if (is_private_ip(dst_ip)) {private_out += pkt_len;} else {if (filter_write(src_ip)){public_out += pkt_len;}}}

4.获取步骤三得结果上报并持久化

拿到公网流量后,传输给用于持久化的程序进行入库操作(对于此类数据,建议用clickhouse进行建库,对体量小的用mysql也可以接受).关于传输方式,可以选择使用生产消费者方式,使用消息中间件缓冲数据(如Kafka)。数据量小通过http/https协议传输后插入也可以。

kafka模板

def push_kafka(data, retries=5, backoff_factor=1):logger.info(f"上传 {data}")# 配置 Kafka 生产者kafka_config = {'bootstrap.servers': KAFKA_URL,  # 替换为你的 Kafka 地址'client.id': 'my-producer',}producer = Producer(kafka_config)topic = KAFKA_TOPIC  # 替换为你的 Kafka 主题名# 重试逻辑for attempt in range(retries):try:# 将数据转换为 JSON 格式并发送到 Kafkaproducer.produce(topic=topic, value=json.dumps(data), callback=delivery_report)producer.flush()  # 强制刷新缓冲区logger.info("数据上传成功")return  # 成功后返回except KafkaException as e:logger.error(f"Kafka error occurred: {e}")except Exception as e:logger.error(f"Unexpected error: {e}")# 等待后重试wait_time = backoff_factor * (2 ** attempt)logger.info(f"等待 {wait_time} 秒后重试...")time.sleep(wait_time)logger.error("所有重试均失败")

http/https直传

def post_with_retries_center(data, retries=5, backoff_factor=1):URL_CENTER = "192.168.10.10" #填自己的地址logger.info(f"上传{data}")session = requests.Session()headers = {'Content-Type': 'application/json',}#  定义重试策略retry_strategy = Retry(total=retries,status_forcelist=[404, 500, 502, 503, 504],allowed_methods=["POST"],backoff_factor=backoff_factor)#  创建适配器并将其安装到会话对象中adapter = HTTPAdapter(max_retries=retry_strategy)session.mount("http://", adapter)session.mount("https://", adapter)try:response = requests.post(URL_CENTER, json=data, headers=headers)response.raise_for_status()  # 如果响应状态码不是200,则抛出异常except requests.exceptions.ConnectionError as conn_err:logger.error(f"Connection error occurred: {conn_err}")except requests.exceptions.Timeout as timeout_err:logger.error(f"Timeout error occurred: {timeout_err}")except requests.exceptions.RequestException as req_err:logger.error(f"An error occurred: {req_err}")except Exception as e:logger.error(f"Unexpected error: {e}")return None

方案二技术实现

在Linux服务器上安装服务,抓取到数据包后本地处理,直接将结果发至中央

流程为

  1. linux服务器开启tcpdump抓包
  2. linux服务器分析包,获取私网、公网流量结果
  3. linux服务器推送消息中间件
  4. 中央消费者对消息消费并入库

方案二与方案一代码几乎相同,只是职权不同,原linux不需要处理包,现在需要处理,所以处理程序应该在linux服务器上,处理后推送到消息中间件,由中央消费者进行消费即可。代码片段这里就不贴了

结尾

本文章内容适用于基于linux系统的流量包分析,方案本身是完整闭环的。其中对包的处理快慢以及对公、私网的ip段判断、消息推送的方式这些是可以持续优化的点。由于方案本身具备一定商用价值,故没有贴上源码,若读者需要可联系博主提供。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/483362.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CSS in Depth 2 精译_066】11.2 颜色的定义(上)

当前内容所在位置(可进入专栏查看其他译好的章节内容) 第四部分 视觉增强技术 ✔️【第 11 章 颜色与对比】 ✔️ 11.1 通过对比进行交流 11.1.1 模式的建立11.1.2 还原设计稿 11.2 颜色的定义 ✔️ 11.2.1 色域与色彩空间11.2.2 深入理解颜色表示法 文…

论文导读 I RAFT:使语言模型适应特定领域的RAG

摘要 随着大语言模型(LLMs)的发展,这些模型在广泛的任务中展现出了卓越的性能。然而,当这些模型应用于特定领域时,如何有效融入新信息仍然是一个未解决的问题。本文提出了检索增强微调(RAFT)&a…

华为HarmonyOS 让应用快速拥有账号能力 -- 2 获取用户头像昵称

场景介绍 如应用需要完善用户头像昵称信息,可使用Account Kit提供的头像昵称授权能力,用户允许应用获取头像昵称后,可快速完成个人信息填写。以下只针对Account kit提供的头像昵称授权能力进行介绍,若要获取头像还可通过场景化控…

高校数字化运营平台解决方案:构建统一的服务大厅、业务平台、办公平台,助力打造智慧校园

教育数字化是建设教育强国的重要基础,利用技术和数据助推高校管理转型,从而更好地支撑教学业务开展。 近年来,国家多次发布政策,驱动教育行业的数字化转型。《“十四五”国家信息化规划》,推进信息技术、智能技术与教育…

华为HarmonyOS 让应用快速拥有账号能力 -- 1 华为账号一键登录

概述 华为账号一键登录是基于OAuth 2.0协议标准和OpenID Connect协议标准构建的OAuth2.0 授权登录系统,应用可以通过华为账号一键登录能力方便地获取华为账号用户的身份标识和手机号,快速建立应用内的用户体系。 优势: 利用系统账号的安全…

C语言:指针与数组

一、. 数组名的理解 int arr[5] { 0,1,2,3,4 }; int* p &arr[0]; 在之前我们知道要取一个数组的首元素地址就可以使用&arr[0],但其实数组名本身就是地址,而且是数组首元素的地址。在下图中我们就通过测试看出,结果确实如此。 可是…

是什么阻断了kafka与zk的链接?

转载说明:如果您喜欢这篇文章并打算转载它,请私信作者取得授权。感谢您喜爱本文,请文明转载,谢谢。 问题描述: 前几天部署一套环境,先把zk集群起来了,之后第二天在启动kafka的时候,…

MAUI APP开发蓝牙协议的经验分享:与跳绳设备对接

在开发MAUI应用程序时,蓝牙协议的应用是一个重要的环节,尤其是在需要与外部设备如智能跳绳进行数据交换的场景中。以下是我在开发过程中的一些经验和心得,希望能为你的项目提供帮助。 1. 蓝牙协议基础 蓝牙协议是无线通信的一种标准&#x…

算法日记 40 day 单调栈

最后两题了,直接上题目。 题目:接雨水 42. 接雨水 - 力扣(LeetCode) 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 示例 1: 输入&#xff1…

浏览器渲染原理

渲染原理 第一步解析Html第二步样式计算第三步布局第四步分层第五步绘制第六步分块第七步光栅化第八步画常见面试题什么是回流reflow?什么是重绘repaint? 当浏览器的网络线程收到HTML文档之后,会产生一个渲染任务并且会将其传递给渲染主线程的…

嵌入式系统应用-LVGL的应用-平衡球游戏 part2

平衡球游戏 part2 4 mpu60504.1 mpu6050 介绍4.2 电路图4.3 驱动代码编写 5 游戏界面移植5.1 移植源文件5.2 添加头文件 6 参数移植6.1 4 mpu6050 4.1 mpu6050 介绍 MPU6050是一款由InvenSense公司生产的加速度计和陀螺仪传感器,广泛应用于消费电子、机器人等领域…

ELK的Filebeat

目录 传送门前言一、概念1. 主要功能2. 架构3. 使用场景4. 模块5. 监控与管理 二、下载地址三、Linux下7.6.2版本安装filebeat.yml配置文件参考(不要直接拷贝用)多行匹配配置过滤配置最终配置(一、多行匹配、直接读取日志文件、EFK方案&#…

JS实现高效导航——A*寻路算法+导航图简化法

一、如何实现两点间路径导航 导航实现的通用步骤,一般是: 1、网格划分 将地图划分为网格,即例如地图是一张图片,其像素为1000*1000,那我们将此图片划分为各个10*10的网格,从而提高寻路算法的计算量。 2、标…

【分页查询】.NET开源 ORM 框架 SqlSugar 系列

💥 .NET开源 ORM 框架 SqlSugar 系列 🎉🎉🎉 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列…

AI - 谈谈RAG中的查询分析(2)

AI - 谈谈RAG中的查询分析(2) 大家好,RAG中的查询分析是比较有趣的一个点,内容丰富,并不是一句话能聊的清楚的。今天接着上一篇,继续探讨RAG中的查询分析,并在功能层面和代码层面持续改进。 功…

Python 入门教程(2)搭建环境 | 2.4、VSCode配置Node.js运行环境

文章目录 一、VSCode配置Node.js运行环境1、软件安装2、安装Node.js插件3、配置VSCode4、创建并运行Node.js文件5、调试Node.js代码 一、VSCode配置Node.js运行环境 1、软件安装 安装下面的软件: 安装Node.js:Node.js官网 下载Node.js安装包。建议选择L…

redis核心命令全局命令 + redis 常见的数据结构 + redis单线程模型

文章目录 一. 核心命令1. set2. get 二. 全局命令1. keys2. exists3. del4. expire5. ttl6. type 三. redis 常见的数据结构及内部编码四. redis单线程模型 一. 核心命令 1. set set key value key 和 value 都是string类型的 对于key value, 不需要加上引号, 就是表示字符串…

哈希及其模拟实现

1.哈希的概念 顺序结构以及平衡树中,元素的关键码与其存储位置之间没有对应的关系。因此,在查找一个元素时,必须要经过关键码的多次比较。顺序查找的时间复杂度为O(N),平衡树中为树的高度,即O(log_2 N),搜…

k8s,声明式API对象理解

命令式API 比如: 先kubectl create,再replace的操作,我们称为命令式配置文件操作 kubectl replace的执行过程,是使用新的YAML文件中的API对象,替换原有的API对象;而kubectl apply,则是执行了一…

开源ISP介绍(1)——开源ISP的Vivado框架搭建

开源github链接:bxinquan/zynq_cam_isp_demo: 基于verilog实现了ISP图像处理IP 国内Gitee链接:zynq_cam_isp: 开源ISP项目 基于以上开源链接移植项目到正点原子领航者Zynq7020开发板,并对该项目的Vivddo工程进行架构详解,后续会…