Python数据处理——re库与pydantic的使用总结与实战,处理采集到的思科ASA防火墙设备信息

目录

Python正则表达式re库的基本用法

引入re库

各函数功能

总结

使用方法举例

正则表达式语法与书写方式

正则表达式的常用操作符

思科ASA防火墙数据

数据1

数据2

书写正则表达式

Python中pydantic的使用

导入基础数据模板

根据数据采集目标定义Pydantic数据类型

数据分析采集函数 


Python正则表达式re库的基本用法

引入re库

import re

各函数功能

函数用法
re.match()从字符串起始位置用正则表达式进行数据匹配,匹配到一个数据立即结束并返回第一个匹配到的结果,返回一个Match object对象
re.search()从字符串起始位置用正则表达式进行数据匹配,返回一个Match object对象,含有第一个匹配到数据的位置
re.findall()从字符串起始位置用自定义的正则表达式进行数据匹配,一直尝试匹配到字符串结束,返回匹配到的所有子串,是一个列表
re.split()从字符串起始位置用自定义的正则表达式进行数据匹配,一直尝试匹配到字符串结束,将字符串按正则表达式匹配到的结果进行分割,返回被分割的子串,是一个列表
re.sub()从字符串起始位置用自定义的正则表达式进行数据匹配,一直尝试匹配到字符串结束,将字符串中按正则表达式匹配到的子串全部替换为自定义的内容,返回被替换后的字符串
re.finditer()从字符串起始位置用自定义的正则表达式进行数据匹配,一直尝试匹配到字符串结束,返回一个callable_iterator object对象,可以被迭代,里面每个数据是Match object对象

总结

其实在数据处理中我使用re.search()居多,在使用上也很符合查找内容的惯性思维,把需要采集的数据拆分书写成一个个短小的正则表达式,写出的代码也容易维护,缺点是会对字符串进行多次匹配,会浪费资源,如果设涉及大量数据的分析采集推荐使用re.match()或者是混合使用其他函数,并书写单个较长的正则表达式可以达到节省资源的目的

使用方法举例

data = re.match(正则表达式,需要处理的字符串,匹配选项)

在Python中正则表达式的格式为:r'...正则表达式内容...'

例如data = re.match(r'[0-9]', '123')

正则表达式语法与书写方式

正则表达式的常用操作符

其余可自行搜索用法,本文主要涉及这几个

符号说明
.匹配除换行符(\n、\r)之外的任何单个字符
\d匹配一个数字,等价于[0-9]
\D匹配一个任何非数字字符,等价于[^0-9]
\s匹配一个任何空白字符,如空格
\S匹配一个任何非空白字符
\n匹配一个换行符
*匹配前一个字符0次或无限次
+匹配前一个字符1次或无限次
()用于标明匹配组,方便用一个正则表达式匹配多个内容

在学习如何书写正则表达式之前需要先了解下需要处理的数据

思科ASA防火墙数据

数据1

用Python脚本连接思科ASA防火墙执行如下命令获取(此文章不介绍如何采集数据,数据均为样例,和真实环境也会有出入)

show interface ip brief

 回显内容

Interface              IP-Address      OK? Method Status                Protocol
Ethernet0/0            192.168.1.1     YES manual up                    up
Ethernet0/1            10.1.1.1        YES manual up                    up

数据2

执行命令 

show interface detail

回显内容 

Interface Ethernet0 "outside", is up, line protocol is upHardware is i82558, BW 100 Mbps, DLY 100 usecAuto-Duplex(Full-duplex), Auto-Speed(100 Mbps)Input flow control is unsupported, output flow control is unsupportedMAC address 001e.4f9c.2345, MTU 1500IP address 172.16.0.1, subnet mask 255.255.255.128132463 packets input, 9245212 bytes, 0 no bufferReceived 24 broadcasts, 0 runts, 0 giants0 input errors, 0 CRC, 0 frame, 0 overrun, 0 ignored, 0 abort0 pause input, 0 resume input0 L2 decode drops238982 packets output, 15789347 bytes, 0 underruns0 pause output, 0 resume output0 output errors, 0 collisions, 0 interface resets0 babbles, 0 late collisions, 0 deferred0 lost carrier, 0 no carrier0 input reset drops, 0 output reset dropsinput queue (curr/max packets): hardware (0/1) software (0/8)output queue (curr/max packets): hardware (0/9) software (0/1)Traffic Statistics for "outside":132463 packets input, 9245212 bytes238982 packets output, 15789347 bytes3258 packets dropped1 minute input rate 2 pkts/sec,  110 bytes/sec1 minute output rate 7 pkts/sec,  980 bytes/sec1 minute drop rate, 0 pkts/sec5 minute input rate 3 pkts/sec,  150 bytes/sec5 minute output rate 8 pkts/sec,  1012 bytes/sec5 minute drop rate, 0 pkts/secControl Point Interface States:Interface number is 1Interface config status is activeInterface state is activeInterface Ethernet1 "inside", is up, line protocol is upHardware is i82558, BW 100 Mbps, DLY 100 usecAuto-Duplex(Full-duplex), Auto-Speed(100 Mbps)Input flow control is unsupported, output flow control is unsupportedMAC address 001e.4f9c.6789, MTU 1500IP address 192.168.10.1, subnet mask 255.255.255.0204892 packets input, 14235129 bytes, 0 no bufferReceived 18 broadcasts, 0 runts, 0 giants0 input errors, 0 CRC, 0 frame, 0 overrun, 0 ignored, 0 abort0 pause input, 0 resume input0 L2 decode drops174562 packets output, 20982145 bytes, 0 underruns0 pause output, 0 resume output0 output errors, 0 collisions, 0 interface resets0 babbles, 0 late collisions, 0 deferred0 lost carrier, 0 no carrier0 input reset drops, 0 output reset dropsinput queue (curr/max packets): hardware (0/1) software (0/10)output queue (curr/max packets): hardware (2/10) software (0/1)Traffic Statistics for "inside":204892 packets input, 14235129 bytes174562 packets output, 20982145 bytes5124 packets dropped1 minute input rate 4 pkts/sec,  360 bytes/sec1 minute output rate 6 pkts/sec,  890 bytes/sec1 minute drop rate, 1 pkts/sec5 minute input rate 5 pkts/sec,  480 bytes/sec5 minute output rate 7 pkts/sec,  910 bytes/sec5 minute drop rate, 0 pkts/secControl Point Interface States:Interface number is 2Interface config status is activeInterface state is active

书写正则表达式

先来一个简单的例子,在执行了show interface ip brief后会显示出接口状态和其ip地址等信息

Interface              IP-Address      OK? Method Status                Protocol
Ethernet0/0            192.168.1.1     YES manual up                    up
Ethernet0/1            10.1.1.1        YES manual up                    up

 现在假设我只对接口名称和其对应的ip地址感兴趣,想要用正则表达式将他们提取出来可以使用如下正则表达式,这个表达式还有一些小瑕疵,下面来拆解看一下他的含义:

推荐正则表达式辅助书写和练习的工具:regex101: build, test, and debug regex

(\S+)\s*(\S+).* 

其中被"()"括起来的内容为我们想要捕获的数据,获取第一个"()"中的内容可对返回的对象使用.group(1)方法来获取以此类推获取第二个可用.group(2)来获取

\S匹配一个任何非空白字符,后面添加“+”符号代表连续匹配1个以上的非空白字符:

\s匹配一个任何空白字符,如空格,后面加“*”代表连续匹配0个以上的非空白字符:

将这两者组合起来:

发现除了名称和ip之外还匹配到了其他内容,可以在后面加上“.*”直接代替其余内容直到出现换行符\n:

发现还有一个小细节没考虑到,会匹配到上面的描述字段“Interface” 、“IP-Address”,这时可以观察感想要匹配数据的特征来做一个排除,“Ethernet0/0”可以拆分成多个字符+一个数字的形式,即拆分成:“Ethernet0/”和“0”,其余数据也适用,都可以堪称一个字符+结尾一个数字的格式,这时可以做一下修改,把“(\S+)”修改为“(\S+\d)”:

这样就可以成功获取到想要的数据了,可以编写python代码来试验一下:

import redata = """
Interface              IP-Address      OK? Method Status                Protocol
Ethernet0/0            192.168.1.1     YES manual up                    up
Ethernet0/1            10.1.1.1        YES manual up                    up
"""interface_match = re.search(r'(\S+\d)\s*(\S+).*', data)print(interface_match.group(1))
print(interface_match.group(2))

输出结果:

 

re.search方法会在查找到第一个数据后停止匹配,返回第一次匹配到的结果,如果想要捕获所有的Interface数据,可以先将data按行进行分割再进行匹配:

import redata = """
Interface              IP-Address      OK? Method Status                Protocol
Ethernet0/0            192.168.1.1     YES manual up                    up
Ethernet0/1            10.1.1.1        YES manual up                    up
"""data_list = data.split('\n')
for line in data_list:interface_match = re.search(r'(\S+\d)\s*(\S+).*', line)if interface_match:print(interface_match.group(1))print(interface_match.group(2))

输出结果:

 

Python中pydantic的使用

导入基础数据模板

from pydantic import BaseModel

一个简单的例子

pydantic在python开发中可以将数据类型明确,也方便对数据的校验

from pydantic import BaseModelclass User(BaseModel):id: intname: strpermission: list[str]user = User(id=123456, name='admin',permission=['can_add_student','can_edit_student'])print(user)

根据数据采集目标定义Pydantic数据类型

# 定义Pydantic数据类型
class Count(BaseModel):packets: intpackets_rate: strbytes: intbytes_rate: strclass Input(Count):passclass Output(Count):passclass Dropped(Count):passclass InterfaceStatus(BaseModel):input_status: Inputoutput_status: Outputdrop_status: Droppedclass InterfaceBaseInfo(BaseModel):name: strstatus: strprotocol_status: strip_address: strsubnet_mask: strmac_address: strbandwidth: strdelay: strmtu: inthardware_type: strclass Interface(BaseModel):base_info: InterfaceBaseInfodetail_info: InterfaceStatusclass ASAData(BaseModel):outside_interface: Interfaceinside_interface: Interface

数据分析采集函数 

和上面的思路一样,只是多了很多需要匹配的数据

def get_asa_data():# 声明interfaces = {}# 处理interface_detail返回数据for paragraph in interface_detail.split("Interface"):if not paragraph.strip():continue# 正则匹配base_infoname_match = re.search(r'(\S+)\s"(\S+)",', paragraph)mac_mtu_match = re.search(r'MAC\saddress\s+(\S+),\sMTU\s(\d+)', paragraph)ip_match = re.search(r'IP\saddress\s(\S+),.*mask\s(\S+)', paragraph)hardware_match = re.search(r'Hardware\sis\s(.*),\sBW\s(.*),\sDLY\s(.*)', paragraph)if not name_match or not mac_mtu_match or not ip_match or not hardware_match:continuebase_info = {"name": name_match.group(1),"status": "None",  # 先做初始化,后续再从interface_ip_brief中获取"protocol_status": "None","ip_address": ip_match.group(1),"subnet_mask": ip_match.group(2),"mac_address": mac_mtu_match.group(1),"bandwidth": hardware_match.group(2),"delay": hardware_match.group(3),"mtu": int(mac_mtu_match.group(2)),"hardware_type": hardware_match.group(1),}# 正则匹配detail_infoinput_match = re.search(r'(\d+)\spackets\sinput,\s(\d+)\sbytes', paragraph)input_rate_match = re.search(r'5\sminute\sinput\srate\s(.*),\s+(.*)', paragraph)output_match = re.search(r'(\d+)\spackets\soutput,\s(\d+)\sbytes', paragraph)output_rate_match = re.search(r'5\sminute\soutput\srate\s(.*),\s+(.*)', paragraph)drop_match = re.search(r'(\d+)\spackets\sdropped', paragraph)drop_rate_match = re.search(r'5\sminute\sdrop\srate,\s(.*)', paragraph)detail_info = {"input_status": Input(packets = int(input_match.group(1)) if input_match else -1,bytes = int(input_match.group(2)) if input_match else -1,packets_rate = input_rate_match.group(1) if input_rate_match else "None",bytes_rate = input_rate_match.group(2) if input_rate_match else "None",),"output_status": Output(packets = int(output_match.group(1)) if output_match else -1,bytes = int(output_match.group(2)) if output_match else -1,packets_rate = output_rate_match.group(1) if output_rate_match else "None",bytes_rate = output_rate_match.group(2) if output_rate_match else "None",),"drop_status": Dropped(packets = int(drop_match.group(1)) if drop_match else -1,bytes = -1,packets_rate = drop_rate_match.group(1) if drop_rate_match else "None",bytes_rate = "None",),}interface_name = name_match.group(2)interfaces[interface_name] = Interface(base_info=InterfaceBaseInfo(**base_info),detail_info=InterfaceStatus(**detail_info),)# 处理interface_ip_brie返回数据for line in interface_ip_brief.strip().split("\n"):match = re.search(r'(\S+)\s+(\S+)\s+\S+\s+\S+\s+(\S+)\s+(\S+)', line)if match:name, ip, status, protocol_status = match.groups()for interface in interfaces.values():if interface.base_info.ip_address == ip:interface.base_info.status = statusinterface.base_info.protocol_status = protocol_statusasa_data = ASAData(outside_interface=interfaces.get("outside"),inside_interface=interfaces.get("inside"),)# 将数据格式化为json返回return(asa_data.json())

 打印输出结果

# 打印输出结果
print(get_asa_data())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/495821.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

「Python数据科学」标量、向量、矩阵、张量与多维数组的辨析

引言 在数据科学中,有很多概念,其中,最容易搞混的就是标量、向量、矩阵、张量了。具体到这些概念的落地实现,又与多维数组有着密不可分的联系。 本文就来尝试对这些概念进行简要地梳理,从而更加清晰地理解这些概念及…

iOS开发代码块-OC版

iOS开发代码块-OC版 资源分享资源使用详情Xcode自带代码块自定义代码块 资源分享 自提: 通过网盘分享的文件:CodeSnippets 2.zip 链接: https://pan.baidu.com/s/1Yh8q9PbyeNpuYpasG4IiVg?pwddn1i 提取码: dn1i Xcode中的代码片段默认放在下面的目录中…

第十七届山东省职业院校技能大赛 中职组“网络安全”赛项任务书正式赛题

第十七届山东省职业院校技能大赛 中职组“网络安全”赛项任务书-A 目录 一、竞赛阶段 二、竞赛任务书内容 (一)拓扑图 (二)模块A 基础设施设置与安全加固(200分) (三)B模块安全事件响应/网络安全数据取证/…

Git(11)之log显示支持中文

Git(11)之log显示支持中文 Author:Once Day Date:2024年12月21日 漫漫长路有人对你微笑过嘛… 参考文档:GIT使用log命令显示中文乱码_gitlab的log在matlab里显示中文乱码-CSDN博客 全系列文章可查看专栏: Git使用记录_Once_day的博客-CSD…

rabbitmq相关使用

使用rabbitmq实现异步解耦 使用步骤&#xff1a; 1、pom依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2、yml配置文件 spring:rabbitmq:host: 12…

oracle linux8.10+ oracle 23ai安装

介质准备&#xff1a; 数据库23ai https://edelivery.oracle.com 上述网站下载基础版本&#xff0c;本次未使用。 本次是安装了带补丁的版本&#xff1a; Database Release Update 23.6.0.24.10 GoldImage表示带补丁用于直接安装的软件包 查找888.1对应Primary Note for …

使用helm安装canal-server和canal-admin

1.前置条件&#xff1a; 需要电脑有helm kubectl 如果没有的话需要安装环境 2.需要拉取canal-server和canal-admin镜像 拉取镜像的时候可能存在拉取不下来的情况&#xff0c;需要配置&#xff1a; /etc/docker/daemon.json {"registry-mirrors": ["https://do…

使用ForceBindIP绑定应用到指定IP

前言 使用ForceBindIP工具&#xff0c;用户可以轻松地将特定应用程序绑定到指定的IP地址&#xff0c;从而确保应用程序的网络连接通过指定的网络适配器进行。通过在命令提示符下运行ForceBindIP并指定IP地址和应用程序的完整路径&#xff0c;用户能够控制应用程序的网络流量&a…

Windows下安装Rabbit MQ

一、安装环境&#xff1a; 系统&#xff1a;windows11; 环境配置安装&#xff1a;otp_win64_25.3.2.14.exe&#xff08;erlang类库&#xff09;&#xff1b; 服务应用安装&#xff1a;rabbitmq-server-3.12.4.exe&#xff1b; 二、erlang环境&#xff1a; 1.执行…

层序遍历练习

层次遍历 II 给定一个二叉树&#xff0c;返回其节点值自底向上的层次遍历。 &#xff08;即按从叶子节点所在层到根节点所在的层&#xff0c;逐层从左向右遍历&#xff09; 思路 相对于102.二叉树的层序遍历&#xff0c;就是最后把result数组反转一下就可以了。 C代码&…

[Python机器学习]:Anaconda3实践环境安装和使用

文章目录 一&#xff1a;机器学习基本环境安装二&#xff1a;设置环境变量三&#xff1a;检查结果四&#xff1a;创建自己的虚拟环境1&#xff1a;查看虚拟环境: conda env list2&#xff1a;创建新环境:conda create --name envname python3.83&#xff1a;删除环境:conda rem…

重温设计模式--观察者模式

文章目录 观察者模式&#xff08;Observer Pattern&#xff09;概述观察者模式UML图作用&#xff1a;实现对象间的解耦支持一对多的依赖关系易于维护和扩展 观察者模式的结构抽象主题&#xff08;Subject&#xff09;&#xff1a;具体主题&#xff08;Concrete Subject&#xf…

【更新】Docker新手入门教程2:在Windows系统通过compose创建多个mysql镜像并配置应用

文章目录 前言一、运行Docker init生成docker配置文件二、修改创建镜像的配置文件1、添加镜像挂载点 三、【拉取镜像】四、生成Docker 镜像查看生成的镜像 五、修改Compose配置文件3、配置Mysql六、生成Docker容器七、检查容器创建状态总结 前言 在window下通过Docker创建mysq…

lxml 解析xml\html

from lxml import etree# XML文档示例 xml_doc """ <root><book><title>Python编程指南</title><author>张三</author></book><book><title>Python高级编程</title><author>李四</autho…

FFmpeg在python里推流被处理过的视频流

链式算法处理视频流 视频源是本地摄像头 # codinggbk # 本地摄像头直接推流到 RTMP 服务器 import cv2 import mediapipe as mp import subprocess as sp# 初始化 Mediapipe mp_drawing mp.solutions.drawing_utils mp_drawing_styles mp.solutions.drawing_styles mp_holis…

从零开始k8s-部署篇(未完待续)

从零开始k8s 1.部署k8s-部署篇 1.部署k8s-部署篇 本次部署完全学习于华子的博客点击此处进入华子主页 K8S中文官网&#xff1a;https://kubernetes.io/zh-cn 笔者从零开始部署的k8s&#xff0c;部署前置条件为 1.需要harbor仓库&#xff0c;存放镜像&#xff0c;拉取镜像&am…

Pytorch | 利用AI-FGTM针对CIFAR10上的ResNet分类器进行对抗攻击

Pytorch | 利用AI-FGTM针对CIFAR10上的ResNet分类器进行对抗攻击 CIFAR数据集AI-FGTM介绍算法流程初始化迭代更新&#xff08; t 0 t 0 t0 到 T − 1 T - 1 T−1&#xff09;迭代完成 AI-FGTM代码实现AI-FGTM算法实现攻击效果 代码汇总aifgtm.pytrain.pyadvtest.py 之前已经…

视频监控平台:Liveweb视频汇聚融合平台智慧安防视频监控应用方案

Liveweb是一款功能强大、灵活部署的安防视频监控平台&#xff0c;支持多种主流标准协议&#xff0c;包括GB28181、RTSP/Onvif、RTMP等&#xff0c;同时兼容海康Ehome、海大宇等厂家的私有协议和SDK接入。该平台不仅提供传统安防监控功能&#xff0c;还支持接入AI智能分析&#…

【Linux系列】Shell 脚本中的条件判断:`[ ]`与`[[ ]]`的比较

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【LLM论文日更】| 训练大型语言模型在连续潜在空间中进行推理

论文&#xff1a;https://arxiv.org/pdf/2412.06769代码&#xff1a;暂未开源机构 &#xff1a;Meta领域&#xff1a;思维链发表&#xff1a;arxiv 研究背景 研究问题&#xff1a;这篇文章要解决的问题是如何在大语言模型&#xff08;LLMs&#xff09;中实现一种新的推理范式&…