建模杂谈系列256 规则函数化改造

说明

之前尝试用FastAPI来构造规则,碰到的问题是由于请求量过大(TPS > 1000), 从而导致微服务端口资源耗尽。所以现在的point是:

  • 1 如何使用函数来替代微服务(同时要保留使用微服务的优点)
  • 2 进一步抽象并规范规则的执行
  • 3 等效合并规则的方法

内容

0 机制讨论

过去在使用tornado作为后端服务的时候,是没有碰到端口耗尽的问题的,也许是tornado本身采取的是长连接,更适合大批量数据请求的后端任务。
FastAPI更适合做短、平的IO类异步需求,不可以用于级联,TPS大约400-1200的样子。
这次的业务场景是实体匹配,我们需要从原文中提取出实体,然后完成匹配。

数据样本:

ent_list = ['基金', '美芯晟', '高新兴', '骏成科技', '证券时报', '深市主板', '创业板', '沪市', '科创板', '计算机', 
'机械设备', '共有3', '潍柴动力', '乐心医疗', '嘉曼服饰', '敏芯股份', '渝开发', '长虹美菱', '德联集团', '数据宝', 
'中航西飞', '顺络电子', '基金家数', '华利集团', '杰瑞股份', '邦彦技术', '兴瑞科技', '深天马', '漫步', 
'金力永磁', '太阳能', '普蕊斯', '德方纳米', '华锐精密', '伊之密', '西子洁能', '陕西华达', '浙江鼎力', '诺瓦星云', '远光']original_text_half_width = '''昨日基金共对31家公司进行调研,扎堆调研美芯晟、高新兴、骏成科技等。证券时报•数据宝统计,6月5日共40家公司被机构调研,按调研机构类型看,基金参与31家公司的调研活动,其中,10家以上基金扎堆调研公司共6家。美芯晟最受关注,参与调研的基金达27家;高新兴、骏成科技等分别获18家、15家基金集体调研。基金参与调研的公司中,按所属板块统计,深市主板公司有13家,创业板公司有13家,沪市主板公司有1家,科创板公司有4家。所属行业来看,基金调研的公司共涉及13个行业,所属电子行业最多,有7家公司上榜;计算机、机械设备等紧随其后,分别有4家、4家公司上榜。从基金调研公司的A股总市值统计,总市值在500亿元以上的共有3家,其中总市值超千亿元的有潍柴动力等,总市值不足100亿元的有17家,分别是乐心医疗、嘉曼服饰、敏芯股份等。市场表现上,基金调研股中,近5日上涨的有10只,涨幅居前的有敏芯股份、高新兴、骏成科技等,涨幅为21.46%、19.43%、13.83%;下跌的有21只,跌幅居前的有渝开发、长虹美菱、德联集团等,跌幅为12.31%、9.92%、9.22%。数据宝统计,基金参与调研股中,近5日资金净流入的有12只,中航西飞近5日净流入资金1.53亿元,主力资金净流入最多;净流入资金较多的还有高新兴、顺络电子等,净流入资金分别为8217.28万元、3140.67万元。(数据宝)6月5日基金调研公司一览代码简称基金家数最新收盘价(元)近5日涨跌幅(%)行业688458美芯晟27 35.85 -4.48电子300098高新兴18 3.75 19.43计算机301106骏成科技15 32.42 13.83电子002138顺络电子14 25.27 7.76电子300979华利集团14 67.90 1.19纺织服饰300803指南针13 42.92 -0.60计算机002353杰瑞股份9 34.33 -2.05机械设备301276嘉曼服饰8 22.71 -0.74纺织服饰688132邦彦技术8 18.55 5.64国防军工688286敏芯股份6 44.60 21.46电子000338潍柴动力6 15.74 -2.24汽车002937兴瑞科技5 20.93 1.45电子000514渝开发4 3.49 -12.31房地产000050深天马A4 7.31 -2.40电子002351漫步者3 12.73 -0.08电子300748金力永磁2 14.06 -2.77有色金属000591太阳能2 5.05 -6.31公用事业301257普蕊斯2 40.05 -4.53医药生物000768中航西飞2 25.23 4.69国防军工300769德方纳米2 33.88 -2.98电力设备300562乐心医疗1 8.61 -6.62医药生物688059华锐精密1 54.53 -6.79机械设备002666德联集团1 3.94 -9.22基础化工300415伊之密1 22.34 1.79机械设备002534西子洁能1 10.99 -4.18电力设备301517陕西华达1 61.60 2.56国防军工301362民爆光电1 34.00 -7.34家用电器603338浙江鼎力1 61.86 -4.93机械设备301589诺瓦星云1 219.68 -8.31计算机000521长虹美菱1 8.90 -9.92家用电器002063远光软件1 5.70 -1.55计算机注:本文系新闻报道,不构成投资建议,股市有风险,投资需谨慎。'''

在逻辑上,我们会按照实际情况设计分级,在程序上,我们要有一个合并的逻辑。这种逻辑要简单,不要offend逻辑。

1 现有的服务

采用“WaterFall”的方法逐步批量的处理并分流数据。

一条规则是如此

# reject
@app.post("/r000/")
async def r000(justent:JustEnt):the_ent = justent.some_entthe_result = RuleResult()try:if judge_existence(the_ent, word_list=r0_exe_clude_list):the_result.status = 'reject'else:the_result.status = 'pass'return the_result.dict()except Exception as e:raise HTTPException(status_code=400, detail=str(e))

在发起调用时,采用异步方式,每次根据请求的目标先品出参数,然后将渠道的结果进行解析。

import time 
def waterfall_api_mode(last_fall, rule_name ,reject_list = None, get_list = None, mappling_list = None, raw = None , base_url = None):next_fall = []last_ent_list = last_fall pure_rule_url = rule_name + '/'if len(last_ent_list):rule_url = base_url + pure_rule_url# api modetick1 = time.time()task_list = []for ent in last_ent_list:tem_dict = {}tem_dict['task_id'] =  ent tem_dict['url'] = rule_urlif raw is None :tem_dict['json_params'] = {'some_ent':ent}else:tem_dict['json_params'] = {'some_ent':ent,'raw':raw}task_list.append(tem_dict)rule_res = asyncio.run(json_player(task_list, concurrent = 10))# 解析结果,保留passfor tem_res in rule_res:for k,v in tem_res.items():# print(k,v)if v['status'] == 'pass':next_fall.append(k)elif v['status'] == 'get':if get_list is not None :get_list.append(v['data'])if mappling_list is not None :mappling_list.append({'ent':k,'mapping_ent': v['data']})elif v['status'] == 'reject':if reject_list is not None :reject_list.append(k)tick2 = time.time()print('takes %.2f ' %(tick2-tick1))return next_fall

在批量调用规则时,采用几乎一样的形式即可,这是非常简洁的地方。

    #  ============= fall of short # r100_1next_fall_short = waterfall_api_mode(next_fall_short, 'r100_1',base_url = base_url)# r100next_fall_short = waterfall_api_mode(next_fall_short, 'r100', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)# r102next_fall_short = waterfall_api_mode(next_fall_short, 'r102', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)# r102_1next_fall_short = waterfall_api_mode(next_fall_short, 'r102_1', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)# r103next_fall_short = waterfall_api_mode(next_fall_short, 'r103', raw = original_text_half_width,base_url = base_url)# r104next_fall_short = waterfall_api_mode(next_fall_short, 'r104',base_url = base_url)# r105next_fall_short = waterfall_api_mode(next_fall_short, 'r105',base_url = base_url)# r106next_fall_short = waterfall_api_mode(next_fall_short, 'r106',base_url = base_url)# r107next_fall_short = waterfall_api_mode(next_fall_short, 'r107',base_url = base_url)# r200next_fall_short = waterfall_api_mode(next_fall_short, 'r200',base_url = base_url)# r201next_fall_short = waterfall_api_mode(next_fall_short, 'r201',base_url = base_url)# r202next_fall_short = waterfall_api_mode(next_fall_short, 'r202',base_url = base_url)# r203next_fall_short = waterfall_api_mode(next_fall_short, 'r203',base_url = base_url)# r299next_fall_short = waterfall_api_mode(next_fall_short, 'r299', get_list = mr.short_result,mappling_list=mr.mapping_list,base_url = base_url )

觉得还不错,需要保持的地方:

  • 1 数据规范。使用pydantic,这个可以继续保持
  • 2 waterfall_api_mode ,可以作为waterfall_func_mode, 且这次可以规定输出为4部分:get、pass、reject、error
  • 3 执行时,每条规则除了顺序之外,应该还有层次,实现BFS。规则分为若干模式,例如 is_in , is_not_in, 在每个层之间的同类规则可以合并。

2 设计与改进

诶,我突然想到了我的APIFunc。
在这里插入图片描述
总体上说,这个框架还是比较强大的,但是非常僵化,所以最终没有走向实际应用。所以我觉得完全可以进行拆解,重构。当然,里面有一部分问题的解决还是蛮厉害的,反正这一会我想不出来。

有几块内容是需要添加上的:

  • 1 logging对象:灵活的进行记录,后续会和logstash结合在一起(ELK)
  • 2 错误输出:遇到错误时发送到kafka
  • 3 shortuuid: 每次处理会生成一个shortuuid用于追溯,代表一次会话之内的

修改的部分:

  • 1 原来有很多数据的校验部分,现在可以用pydantic来控制
  • 2 BFS替代逐个的链式
  • 3 没有列式方法,全部是行式方法
  • 4 g变量:会存储额外的字典,不必完全按照df格式

优化的部分:

  • 1 修饰器方法,支持按依赖定义规则。例如 on depends of [rule1,rule2], def new rule。

保留的部分:

  • 1 reinit_data 重新初始化数据

2.1 原型部分

2.1.1 Logging
import logging
from logging.handlers import RotatingFileHandlerdef get_logger(name, lpath='/var/log/', module='default.default'):logger = logging.getLogger(name)# 防止重复添加 handlerif not logger.handlers:fpath = lpath + name + '.log'handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)# 设置日志格式为 [时间] - [日志级别] - [模块名称] - 消息formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - [{}] - %(message)s'.format(module),datefmt='%Y-%m-%d %H:%M:%S')handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(logging.INFO)return loggerlogger = get_logger('example')# 记录不同级别的日志
logger.info('[part.a]系统启动完成')
logger.warning('[[part.b]磁盘空间不足,剩余空间小于10%')
logger.error('无法连接数据库,请检查网络设置')
logger.debug('这是调试信息,不会显示在日志中')
logger.critical('系统崩溃,立即采取措施')
2.1.2 BFS

先回忆一下过去的成果,当时的结论是:使用networkx作为核心的图计算工具,而neo4只是后端的存储backup。可以认为是pandas和mysql的关系。

在使用的时候,可以为一个项目设置一个独立的名称,这个独立的名称也就是节点的label,或者可以认为是节点的“表”。在需要的时候,可以整个读取(neo4j disk),在内存中处理(networkx memory)。

这段代码定义了一个很小的图,

import networkx as nx
import matplotlib.pyplot as plt# =======================>  图的定义
# Create a directed graph
G = nx.DiGraph()def hello():print('This is Node Running ...')G.add_node(1)
G.nodes[1]['name'] = 'MinuteData'
G.nodes[1]['run'] =  helloG.add_node(2)
G.nodes[2]['name'] = 'CaptialDataDaily'
G.nodes[2]['run'] =  helloG.add_node(3)
G.nodes[3]['name'] = 'MergeData'
G.nodes[3]['run'] = helloG.add_edge(1,3)
G.add_edge(2,3)G.add_node(4)
G.nodes[4]['name'] = 'FeatureHorizonal'
G.nodes[4]['run'] = helloG.add_edge(3,4)G.add_node(5)
G.nodes[5]['name'] = 'ImbalanceSample'
G.nodes[5]['run'] = helloG.add_edge(4,5)G.add_node(6)
G.nodes[6]['name'] = 'FeatureVertical'
G.nodes[6]['run'] = helloG.add_edge(5,6)# =======================>  图的绘画
# 获取节点标签属性
node_labels = nx.get_node_attributes(G, "name")
# pos = nx.shell_layout(G)
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=False,  node_size=1000, font_size=12, font_color='black', arrows=True)
# 绘制节点标签
_ = nx.draw_networkx_labels(G, pos, labels=node_labels)

在这里插入图片描述
这里,可以看到节点的依赖关系可以很清楚的展示出来。

然后稍微跳一下

# 输入一个nx图,给出BFS层级字典
def BFS(some_G,max_depth = 100):layer_dict = {}# 初始化节点init_node_list = [node for node, in_degree in some_G.in_degree() if in_degree == 0]layer_dict[0] = init_node_list    # 节点的入度字典in_degree_dict = dict(some_G.in_degree())all_nodes = set(some_G.nodes)travel_nodes = set(init_node_list)# 迭代节点for i in range(1,max_depth):last_layer_nodes = layer_dict[i-1]layer_dict[i] = []for last_node in last_layer_nodes:out_nodes = list(some_G.successors(last_node))if len(out_nodes):for out_node in out_nodes:out_node_degree = in_degree_dict[out_node]out_node_degree1 = out_node_degree-1if out_node_degree1 == 0:layer_dict[i].append(out_node)travel_nodes.add(out_node)else:in_degree_dict[out_node] = out_node_degree1gap_set = all_nodes - travel_nodesif len(gap_set) ==0:breakreturn layer_dictBFS(G)
{0: [1, 2], 1: [3], 2: [4], 3: [5], 4: [6]}

给到一个定义好的图,通过BFS可以很快把层级梳理出来。

所以,将原来的修饰器改一改,将节点的依赖关系在启动修饰器的时候解释。函数可以在修饰器下临时定义,也可以引用已经编辑好的。现在已经具备了使用形式化参数(如slice_list_batch)来调用函数了,既有本地的包,也有微服务。

2.1.3 会话

我们将程序的每一次执行视为一次会话。

将程序的每一次执行视为一次会话是一种有用的抽象,可以帮助我们追踪、分析和管理程序的行为。每次执行都可以被认为是一个独立的会话,这些会话可以包括一系列输入、处理和输出。以下是将程序执行视为会话时的一些要点:

1. 会话的定义

  • 每次程序的执行周期(从启动到结束)被视为一个独立的会话。
  • 会话的范围可以根据程序的复杂度定义,可能包括启动、执行逻辑、处理数据、生成结果,并最终结束。

2. 会话数据

  • 输入数据:用户输入或外部数据源提供的信息。
  • 上下文信息:会话中的环境或系统状态(如用户信息、配置设置、会话 ID)。
  • 日志记录:在每个会话中生成的日志信息,帮助监控、调试和跟踪程序执行的过程。
  • 输出数据:会话完成后生成的结果或操作。

3. 会话标识

  • 每个会话可以使用唯一的标识符(例如 UUID、时间戳)来区分和追踪。
  • 日志和监控系统可以根据这个标识符来收集会话信息。

4. 会话的生命周期

  • 开始:程序启动或用户发起的操作。
  • 执行:程序的核心逻辑运行,处理输入并生成中间或最终结果。
  • 结束:程序完成执行或用户操作结束。程序可以写入日志、清理资源或返回结果。

5. 会话状态

  • 成功:程序按预期完成所有操作。
  • 失败:程序执行中出现错误或异常。
  • 中断:程序由于外部原因或用户取消而中途停止。

6. 会话管理

  • 可以通过记录每次会话的执行时间、状态、输入和输出数据,来分析系统的性能和稳定性。
  • 会话管理有助于调试(当出现问题时可以回溯某一具体会话)、分析(汇总和统计会话数据)以及优化程序。

7. 会话存储

  • 将会话数据存储到数据库、日志文件或分布式系统中,以便后续分析或复盘。

通过这种“会话”概念,能够更好地组织和管理程序的执行过程,尤其在需要跟踪状态、并发操作、或者执行历史时非常有用。

两个需要增加的点(以前没这么实施)

  • 1 生成uuid,用于生成会话的唯一ID
import uuiddef get_uuid(version=4, name=None, namespace=None):"""生成 UUID。参数:- version: UUID 版本 (1, 3, 4, 5)- name: 当使用 UUID3 或 UUID5 时,需要提供的名称- namespace: 当使用 UUID3 或 UUID5 时,需要提供的命名空间 (uuid.NAMESPACE_DNS, uuid.NAMESPACE_URL 等)返回:- 生成的 UUID 字符串"""if version == 1:# 基于时间生成 UUIDreturn uuid.uuid1()elif version == 3:if name is None or namespace is None:raise ValueError("UUID3 需要提供 name 和 namespace 参数")# 基于 MD5 哈希的命名空间 UUIDreturn uuid.uuid3(namespace, name)elif version == 4:# 生成随机的 UUIDreturn uuid.uuid4()elif version == 5:if name is None or namespace is None:raise ValueError("UUID5 需要提供 name 和 namespace 参数")# 基于 SHA-1 哈希的命名空间 UUIDreturn uuid.uuid5(namespace, name)else:raise ValueError("不支持的 UUID 版本。版本应为 1, 3, 4 或 5")
  • 2 会话数据存储

在高性能的场景下,里面增加的每一个操作可能都会导致系统的不稳定。但是,如果是必要的操作,那么也不能省。

我问了下大模型,自己也想了想,觉得还是用kafka比较合适。

python操作kafka一般使用confluent-kafka,在有些环境下安装会有点问题。例如,我在ubuntu18.04上安装时,爆了一些底层错误,类似C之类的依赖;在20.04上安装就没有问题。但总归要考虑这种环境问题差异会比较麻烦,所以我也做了一个kafka_agent,以API的形式提供kafka的访问。缺点是,json序列化的过程要加多一次。

我们来考虑当前场景时,并不是对每一个请求都发送会话数据:

  • 1 正常的执行(INFO):可以考虑按很低的概率抓取会话数据。
  • 2 错误(ERROR): 可以完全抓取,但这个类型的比例应该本身就是极低的。
  • 3 特定的抓取(DEBUG):可以在请求时用特定的字段区分,这类型的会话数据会被抓取。

总之,需要发起数据存储的概率非常低,总体上可能不到1%,所以这些额外的开销应该可以接受。反之,如果因为会话数据的存储影响了处理,说明:

  • 1 程序的水准过低,错误率太高。
  • 2 确实有必要进行并行:一边运行,一边监控。

如果是程序问题,那么就需要不断优化;如果是需要同步进行并行检查,那么就设置缓冲队列,加分布式处理。

使用kafka agent

假设topic为event_collect ,发送一个消息

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time 
class Producer(BaseModel):servers : str raw_msg_list : list is_json : bool = True topic : str @propertydef msg_list(self):# change raw - json if self.is_json:tick1 = time.time()the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()print('takes %.2f for json dumps ' %(time.time() - tick1 ))return the_list else:return self.raw_msg_list

回顾一下kafka的搭建,可以使用docker-compose搭建,但是我还是比较喜欢直接用docker。

首先需要搭建zookeeper。

docker run -d --restart=always --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

然后再搭kafka,假设kafka分为内网和外网监听。
创建kafka持久化的路径

mkdir /home/data2T/kafka_data

然后创建

WAN_IP=XXXX
LAN_IP=192.168.0.159
docker run -d --name kafka \-p xxxx:xxxx \-p 9092:9092 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:xxxx \-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:xxxx \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAME=INTERNAL \-e KAFKA_LISTENER_NAME=EXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /home/data2T/kafka_data:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

这时候就可以使用kafka_agent进行连接了

生产

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time 
class Producer(BaseModel):servers : str raw_msg_list : list is_json : bool = True topic : str @propertydef msg_list(self):# change raw - json if self.is_json:tick1 = time.time()the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()print('takes %.2f for json dumps ' %(time.time() - tick1 ))return the_list else:return self.raw_msg_listmsg_list = [{'id':i ,'value':'abc' } for i in range(10)]
produces = Producer(servers = WAN_IP,raw_msg_list = msg_list, topic='mytest200' )import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/send_msg/',json = produces.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )resp 10
# 外网被占用的情况下,耗时比较久
takes 1.44

消费

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time # group.id: 声明不同的group.id 可以重头消费
class InputConsumer(BaseModel):servers : str groupid : str = 'default01'is_commit: bool = True msg_num : int  = 3 topic : str is_json : bool = True # 外网
the_consumer = InputConsumer(servers = f'{WAN_IP}', msg_num =10, topic='mytest202')import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )# 内网
lan_the_consumer = InputConsumer(servers = f'{LAN_IP}', msg_num =10, topic='mytest202')
import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = lan_the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )

我发现在带宽在被占满的情况,从公网拉取的消息结果为空,但是从内网可以拉取到结果。

在这里插入图片描述

原因大致如下:对应方法是保证带宽,或者在消费端进行修改
在这里插入图片描述
当我取消掉模拟耗带宽的操作(rsync大文件),此时无论WAN还是LAN都恢复正常了。
在这里插入图片描述

Q1: 使用代理,性能是否会有影响?

A1: 由于向代理发送,接受消息都要经过json序列化,效率将会大幅下降。80%以上的开销均为序列化开销。

生产者: 10万json+agent 1秒 外/ 0.78 内,仅10万json 0.5秒
消费者: 10万条 2.1秒 |1.95 |1.79

但可以看到,这样的速度仍然可以大规模使用。

Q2: 如果输错了服务器地址会怎样?

A2: 服务将陷入短暂不可用情况。在取消错误的请求后,大约5分钟,代理重连太久后才会自动取消。

结论:保存数据走内网kafka。

其他

Logstash的调试

Logstash 是一个开源的 数据收集引擎,通常用于实时数据处理和日志管理。它可以从多种来源收集数据,将其过滤、解析,并将处理后的数据发送到不同的目标存储系统。Logstash 是 ELK/Elastic Stack(Elasticsearch、Logstash、Kibana)的一部分,通常与 Elasticsearch 和 Kibana 搭配使用来构建一个完整的日志和事件管理系统。

Logstash 的主要功能

  1. 数据收集

    • Logstash 支持从各种数据源收集数据,例如日志文件、数据库、网络、消息队列等。通过插件系统,它能够轻松集成到不同的数据源环境中。
  2. 数据过滤与解析

    • Logstash 可以对收集到的数据进行过滤和解析,例如使用正则表达式提取字段,重新格式化数据,或者对数据进行清洗。
    • Logstash 的过滤器插件支持丰富的处理操作,比如 Grok 解析、JSON、日期处理、去重、聚合等。
  3. 数据输出

    • Logstash 可以将处理后的数据发送到多个目标系统,比如 Elasticsearch(用于搜索和分析)、文件、数据库、消息队列、监控系统等。

Logstash 主要的架构组件

  • Inputs(输入插件):用于指定数据来源,如文件、数据库、消息队列等。常见的输入插件包括 filesyslogkafkahttp 等。

  • Filters(过滤插件):用于处理、解析和转换数据,可以使用 Grok、正则表达式、日期处理等插件来解析复杂的日志格式。

  • Outputs(输出插件):用于定义数据的存储位置,比如发送到 Elasticsearch、存储到文件、发送到消息队列等。

常见使用场景

  1. 日志管理与分析

    • Logstash 经常与 Elasticsearch 和 Kibana 搭配使用来实现集中式日志管理,将来自不同服务的日志集中采集、分析和展示。
  2. 实时数据流处理

    • 它还可以用来处理实时数据流,例如从 Kafka 或 Redis 获取消息,对数据进行实时处理后发送到目标系统。
  3. 系统监控与安全分析

    • 在 DevOps 环境中,Logstash 用于实时监控应用程序、服务器和网络设备的日志,并通过 Kibana 展示给运维人员,实现系统健康监控和安全日志分析。

简单工作流程

  1. 输入:从不同的数据源收集数据(如文件、数据库、API 等)。
  2. 过滤:通过解析、格式化和过滤等操作对数据进行处理。
  3. 输出:将处理后的数据发送到指定目标(如 Elasticsearch、Kafka、文件等)。

示例

下面是一个简单的 Logstash 配置,它从一个日志文件中收集数据,解析后发送到 Elasticsearch:

input {file {path => "/var/log/example.log"start_position => "beginning"}
}filter {grok {match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }}date {match => ["timestamp", "ISO8601"]}
}output {elasticsearch {hosts => ["localhost:9200"]index => "logs-%{+YYYY.MM.dd}"}
}

这个配置会将日志文件中的数据解析为 JSON 格式,并按日期创建 Elasticsearch 索引。

Logstash 通过灵活的输入、过滤、输出插件,使它成为处理异构数据的强大工具。

日志

这段日志输出来自 Logstash,显示了一个日志事件的详细信息。下面是每个字段的解释:

9月 15 19:09:07 m7 logstash[23910]: {
9月 15 19:09:07 m7 logstash[23910]:        "@version" => "1",
9月 15 19:09:07 m7 logstash[23910]:             "log" => {
9月 15 19:09:07 m7 logstash[23910]:         "file" => {
9月 15 19:09:07 m7 logstash[23910]:             "path" => "/var/log/example.log"
9月 15 19:09:07 m7 logstash[23910]:         }
9月 15 19:09:07 m7 logstash[23910]:     },
9月 15 19:09:07 m7 logstash[23910]:          "module" => "part.b",
9月 15 19:09:07 m7 logstash[23910]:     "log_message" => "无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]:       "timestamp" => "2024-09-15 19:08:15",
9月 15 19:09:07 m7 logstash[23910]:         "message" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]:           "level" => "ERROR",
9月 15 19:09:07 m7 logstash[23910]:      "@timestamp" => 2024-09-15T11:09:06.865477838Z,
9月 15 19:09:07 m7 logstash[23910]:           "event" => {
9月 15 19:09:07 m7 logstash[23910]:         "original" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置"
9月 15 19:09:07 m7 logstash[23910]:     },
9月 15 19:09:07 m7 logstash[23910]:            "host" => {
9月 15 19:09:07 m7 logstash[23910]:         "name" => "m7"
9月 15 19:09:07 m7 logstash[23910]:     }
9月 15 19:09:07 m7 logstash[23910]: }

详细解释:

  1. @version: 指示事件的版本,通常为“1”,表示使用的事件格式的版本。

  2. log: 包含日志文件的信息。

    • file: 具体的日志文件信息。
      • path: 日志文件的完整路径,即日志数据的来源。
  3. module: 动态模块名称,即日志消息中包含的模块标识。这个值是你在日志记录中自定义的,比如在你的代码中你设置为 part.b

  4. log_message: 从原始消息中提取出的主要日志内容(不包含时间戳和日志级别)。

  5. timestamp: 日志事件的时间戳,表示事件发生的实际时间。

  6. message: 日志的原始格式化消息,包含时间戳、日志级别、模块名称和日志内容。

  7. level: 日志的级别,例如“ERROR”、“INFO”等。表示事件的严重性。

  8. @timestamp: Logstash 处理事件的时间戳,通常是 Logstash 解析日志并将其写入 Elasticsearch 的时间。

  9. event: 包含原始日志消息的完整文本,通常用于保持日志的原始格式。

  10. host: 提供了有关 Logstash 运行的主机的信息。

    • name: 主机名,显示 Logstash 实例所在的机器名。

这个日志条目展示了从日志文件中提取的数据,以及 Logstash 对其进行解析和处理后的结构化数据。

要特别注意日志偏移的设置,这个相当于是logstash的断点续传

在实际生产环境中,sincedb_path 选项是用于 Logstash 跟踪文件读取进度的机制,默认情况下它不会设置为 /dev/null。下面解释一下它的常用场景和配置方式:

1. sincedb 是什么?

  • sincedb 文件:用于 Logstash 记录输入插件(如 file)读取文件的当前位置。每次 Logstash 读取文件时,它会更新 sincedb 文件,以便在 Logstash 重启或系统重启时能够从上次停止的地方继续读取,而不是从头开始。
  • 位置:默认情况下,sincedb 文件存储在用户的主目录下,例如:
    • Linux: ~/.sincedb_*
    • Windows: C:\Users\Username\.sincedb_*

每个 sincedb 文件会跟踪一个特定日志文件的 inode 信息及读取进度。

2. 实际场景下 sincedb_path 的使用

  • 正常生产环境

    • 典型配置:你通常会为 sincedb_path 指定一个具体的文件路径,确保 Logstash 在重启时能够继续处理文件。例如:

      sincedb_path => "/var/lib/logstash/sincedb"
      

      在这个例子中,sincedb 文件会存储在 /var/lib/logstash/ 目录下,确保 Logstash 有足够的权限去读取和写入该文件。

  • 文件路径管理:如果你有多个不同的日志文件输入,可以为每个文件输入指定不同的 sincedb_path,以避免冲突。例如:

    input {file {path => "/var/log/app1.log"sincedb_path => "/var/lib/logstash/sincedb_app1"}file {path => "/var/log/app2.log"sincedb_path => "/var/lib/logstash/sincedb_app2"}
    }
    

3. sincedb_path => "/dev/null" 在实际中的用途

在某些特殊场景下,你可能会临时使用 /dev/null,但不建议在生产环境中使用。

使用 /dev/null 的情况:
  • 调试/开发阶段

    • 当你在开发或调试 Logstash 配置时,你可能希望每次启动 Logstash 时都从头读取日志文件。在这种情况下,你可以临时将 sincedb_path 设置为 /dev/null,这样 Logstash 每次都会忽略之前的进度,从文件的开头开始读取。

      sincedb_path => "/dev/null"
      
  • 短期任务

    • 对于一次性读取文件的任务或临时性的日志分析,你可能不需要记录进度。在这种情况下,使用 /dev/null 也是合理的。
不建议在生产环境中使用的原因:
  • 文件读取进度丢失:如果你将 sincedb_path 设置为 /dev/null,Logstash 无法保存文件读取进度。在生产环境中,如果 Logstash 服务重启或系统出现问题,你将丢失已处理文件的位置信息,Logstash 会从头开始读取整个日志文件,这可能会导致重复处理日志。

4. 总结:sincedb_path 在生产环境的最佳实践

  • 指定合适的路径:在生产环境中,建议明确指定 sincedb_path 到一个持久存储的路径,通常位于 /var/lib/logstash 之类的目录,确保 Logstash 能记录文件读取进度。

    例如:

    sincedb_path => "/var/lib/logstash/sincedb_example"
    
  • 使用 /dev/null 慎重:仅在调试、开发或一次性任务中使用 /dev/null,避免在生产环境中使用,以防日志文件重新读取时产生问题。


整体上,我们写好了程序,当逻辑较为复杂时,或者我们将之作为服务进行长期运行时,容易"失联"。我们并不知道程序/服务出了什么问题,进行定位时需要切到非常细的操作,经常达到让人望而却步的程度。

比较可行的方法是程序将日志追加到文件,然后由其他程序(如logstash)进行读取,解析,转存到es中,供监控和后续分析。

追加到日志是代价比较低,且不会犯错的操作。通过rotate,我们也避免了磁盘满的风险。

日志分为5个级别,我们关注其中四个(忽略Debug):

在这里插入图片描述
例如,Info 可以是类心跳的信息,确保程序正常运行,无论是Idle还是处理数据。给到的FeedBack是在常态运行。另一个点就是,提前准备好可以测试其功能的样本数据,隔一段时间调一次,确保无论是空载还是满载都能得到反馈。

Warning 是一些预警,例如磁盘空间不足、内存空间不足、网络带宽不足等。这些随时可能会导致程序崩溃、挂起。

Error 是一些错误,例如数据库连接中断,部分数据逻辑处理错误。

Critical 是致命性错误,例如显卡出问题了,模型无法载入。

日志文件 /var/log/example.log

[2024-09-15 19:08:15] - [INFO] - [part.a] - 系统启动完成
[2024-09-15 19:08:15] - [WARNING] - [part.b] - 磁盘空间不足,剩余空间小于10%
[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置
[2024-09-15 19:08:15] - [CRITICAL] - [part.b] - 系统崩溃,立即采取措施

写入vim /etc/logstash/conf.d/debug_logstash.conf, grok语句解析4个变量:

  • 1 时间戳 timestamp
  • 2 日志等级:level
  • 3 模块名称:module
  • 4 消息主体:log_message

input {file {path => "/var/log/example.log"start_position => "beginning"sincedb_path => "/dev/null"}
}filter {grok {match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] - \[%{LOGLEVEL:level}\] - \[%{DATA:module}\] - %{GREEDYDATA:log_message}"}}
}output {stdout { codec => rubydebug }
}

校验语句

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/debug_logstash.conf --config.test_and_exit

重启服务

systemctl restart logstash

观察结果

journalctl -u logstash -f

总体上感觉grok解析还是有点麻烦,尽量简单点好了;倒是JSON解析可能更适合我,但是显然效率会稍微低一点。

在这里插入图片描述

篇幅太长了,再写一篇续吧。

本篇:

  • 1 介绍了问题的由来,现状(api)。
  • 2 完成了设计思路,以及一些对应组件的validate
    • 1 logging : python的logging 和 logstash配合
    • 2 graph: 使用 networkx 来进行BFS计算,规则之间可以按照nx的方式定义依赖
    • 3 uuid: 使用 uuid 来表示会话
    • 4 kafka: 回顾kafka的搭建,使用kafka agent进行数据提交保存

下篇:

  • 1 重构新的规则对象
    • 核心功能:允许灵活的定义规则(graph)
  • 2 将本次的日志、会话(uuid及保存)实现
  • 3 梳理未来规则分类与合并的思路

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/426544.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据中台建设(六)—— 数据开发-提取数据价值

数据开发-提取数据价值 数据开发涉及的产品能力主要包括三部分:离线开发、实时开发和算法开发。 离线开发主要包括离线数据的加工、发布、运维管理,以及数据分析、数据探索、在线查询和及时分析相关工作。实时开发主要涉及数据的实时接入和实时处理。算…

【算法】动态规划—最长回文子序列

思路分析 关于”回文串“的问题,是面试中常见的,本文提升难度,讲一讲”最长回文子序列“问题,题目很好理解: 输入一个字符串 s,请找出 s 中的最长回文子序列长度。 比如输入 s"aecda"&#xff0c…

WSL中使用AMBER GPU串行版

前提是已经安装过wsl 1 在 WSL 2 中启用 NVIDIA CUDA 参考在 WSL 2 上启用 NVIDIA CUDA | Microsoft Learn 注意:勿在 WSL 中安装任何 Linux 显示驱动程序。Windows 显示驱动程序将同时安装本机 Windows 和 WSL 支持的常规驱动程序组件。 2 在WSL2中配置Cuda 不安…

5G毫米波阵列天线仿真——CDF计算(手动AC远场)

之前写过两个关于阵列天线获取CDF的方法,一个通过Realized Gain,一个通过Power Flow, 三个案例中都是3D中直接波束扫描,并没有展示场路结合的情况。这期我们用Power Flow的方法,手动合并AC任务的波束计算CDF。 还是用…

Linux(7)--目录文件的创建、删除、移动、复制、重命名

文章目录 1. 创建目录、文件2. 删除目录、文件3. 移动目录、文件4. 复制目录、文件5. 重命名目录、文件 1. 创建目录、文件 使用mkdir创建目录: 使用touch创建文件: 2. 删除目录、文件 使用rm可以删除文件: 使用rm -f可以强制删除文件,…

C++掉血迷宫

目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好&#xff0c;我叫这是我58。 程序 #include <iostream> #include <string> #include <cstring> using namespace std; enum RBYG {R 1,B 2,Y 4,G 7, }; struct heal {int ix…

Linux权限理解【Shell的理解】【linux权限的概念、管理、切换】【粘滞位理解】

目录 Linux权限理解1.Xshell命令以及运行原理2.linux权限的学习2.1linux权限的切换2.2linux权限的概念2.3linux权限管理2.3.1linux中文件访问者的分类2.3.2文件类型和访问权限(文件属性)2.3.2.1文件类型2.3.2.2文件权限拓展—文件的起始权限 2.3.3文件权限管理2.3.4文件权限的应…

一文搞定WeakHashMap

写在前面 在缓存场景下&#xff0c;由于内存是有限的&#xff0c;不能缓存所有对象&#xff0c;因此就需要一定的删除机制&#xff0c;淘汰掉一些对象。这个时候可能很快就想到了各种Cache数据过期策略&#xff0c;目前也有一些优秀的包提供了功能丰富的Cache&#xff0c;比如…

十八,Spring Boot 整合 MyBatis-Plus 的详细配置

十八&#xff0c;Spring Boot 整合 MyBatis-Plus 的详细配置 文章目录 十八&#xff0c;Spring Boot 整合 MyBatis-Plus 的详细配置1. MyBatis-Plus 的基本介绍2. Spring Boot 整合 MyBatis Plus 的详细配置3. Spring Boot 整合 MyBatis plus 注意事项和细节4. MyBatisx 插件的…

浅谈红外测温技术在变电站运维中的应用

0引言 随着市场经济的繁荣发展&#xff0c;社会对电力的需求持续增长。城市供电网络的规模和用电设备的总量也在不断扩大&#xff0c;这导致城市电力系统中潜在的网络安全隐患日益增多。作为电力系统核心组成部分的变压器&#xff0c;其安全、稳定的工作直接关系到电能的质量和…

总结拓展十:SAP开发计划(上)

第一节 功能开发说明书介绍 1、功能开发的基础分类 报表查询开发单据打印开发功能开发增强开发接口开发 2、屏幕元素介绍 ——程序屏幕是SAP系统与用户之间的桥梁&#xff0c;屏幕由各种不同元素布局组成 示例&#xff1a;选择屏幕界面 单选输入框 多选输入框 设定默认…

静态库 动态库

https://blog.csdn.net/mahoon411/article/details/113565482 库&#xff1a;可执行代码的二进制文件&#xff0c;里面有可以直接使用的函数&#xff0c;变量等&#xff1b;不能单独运行 因为 Linux 和 Win 的链接器、汇编器、编译器的不同&#xff0c;相同代码的库不同 Lin…

k8s介绍及部署

目录 一 Kubernetes 简介及部署方法 1.1 应用部署方式演变 1.2 容器编排应用 1.3 kubernetes 简介 1.4 K8S的设计架构 1.4.1 K8S各个组件用途 1.4.2 K8S 各组件之间的调用关系 1.4.3 K8S 的 常用名词感念 1.4.4 k8S的分层架构 二 K8S集群环境搭建 2.1 k8s中容器的管…

演示:基于WPF自绘的中国省份、城市、区县矢量地图

一、目的&#xff1a;演示一个基于WPF自绘的中国省份、城市、区县矢量地图 二、效果 国 省 市 三、功能 支持实际经纬度显示 支持平移&#xff0c;缩放等功能 显示中国地图 显示各个省份地图 显示各个省份地图&#xff08;包含在表格中&#xff0c;包含缩率图&#xff09; 显…

[数据集][目标检测]疟疾恶性疟原虫物种目标检测数据集VOC+YOLO格式948张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;948 标注数量(xml文件个数)&#xff1a;948 标注数量(txt文件个数)&#xff1a;948 标注类别…

MySQL系列—12.Undo log

1、概念 DML 操作导致数据变化 , 将变化前的记录写入 Undo 日志。 作用 用于记录更改前的一份 copy &#xff0c;在操作出错时&#xff0c;可以用于回滚、撤销还原&#xff0c;只将数据库 逻辑地恢复到原来的样子 你 插入一条记录时&#xff0c;至少要把这条记录的主键值记下来…

Elasticsearch基础(七):Logstash如何开启死信队列

文章目录 Logstash如何开启死信队列 一、确保 Elasticsearch 输出插件启用 DLQ 支持 二、配置 Logstash DLQ 设置 三、查看死信队列 四、排查 CSV 到 Elasticsearch 数据量不一致的问题 Logstash如何开启死信队列 在 Logstash 中&#xff0c;死信队列&#xff08;Dead Le…

典型BUCK电路学习和设计

手把手教你设计12V3Abuck降压电路-2-相关输入参数讲解_哔哩哔哩_bilibili 这里是输入电容&#xff0c;先过大电容&#xff08;电解电容&#xff09;再过小电容&#xff08;陶瓷贴片电容&#xff0c;高频率波&#xff09; 输出也可以同理 开关电源不能带负载的原因&#xff0c…

RocketMQ实战与集群架构详解

目录 一、MQ简介 MQ的作用主要有以下三个方面 二、RocketMQ产品特点 1、RocketMQ介绍 2、RocketMQ特点 三、RocketMQ实战 1、快速搭建RocketMQ服务 2、快速实现消息收发 1. 命令行快速实现消息收发 2. 搭建Maven客户端项目 3、搭建RocketMQ可视化管理服务 4、升级分…

MYSQL数据库基础篇——DDL

DDL&#xff1a;DDL是数据定义语言&#xff0c;用来定义数据库对象。 一.DDL操作数据库 1.查询 ①查询所有数据库 输入&#xff1b; 得到结果&#xff1a; ②查询当前数据库 输入&#xff1b; 例如执行下面语句&#xff1a; 2.创建 输入 然后展示数据库即可得到结果&…