将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)

前情:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

python脚本实现

厉害的小伙伴最终使用python脚本免费实现了这个功能

文件结构

以下是cursor对代码的评审意见:

这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:

1. 整体架构

项目采用了良好的模块化设计,主要包含以下组件:

  • main.py: 主程序入口,协调整体流程
  • azure_log_downloader.py: Azure blob 存储日志下载器
  • elasticsearch_sender.py: Elasticsearch 数据发送器
  • models.py: 数据模型定义
  • log_config.py: 日志配置
  • .env: 环境配置文件

2. 具体评审

main.py

优点:

  • 支持日期范围处理
  • 良好的错误处理和日志记录
  • 模块化的设计,职责划分清晰

建议改进:

  • 可以考虑添加命令行参数支持,使日期范围可配置
  • 可以添加并发处理支持,提高处理速度
azure_log_downloader.py

优点:

  • 清晰的类设计
  • 良好的错误处理

建议改进:

  • 可以添加重试机制
  • 可以考虑支持异步操作
elasticsearch_sender.py

优点:

  • 实现了批量处理
  • 包含重试机制
  • 良好的连接测试功能

建议改进:

  • 可以考虑添加索引模板管理
  • 可以添加更多的 ES 配置选项
models.py

优点:

  • 使用 dataclass 简化了类定义
  • 详细的字段解析逻辑
  • 良好的类型注解

建议改进:

  • 可以添加字段验证
  • 可以考虑使用 Pydantic 进行数据验证
log_config.py

优点:

  • 完善的日志配置
  • 支持文件和控制台输出

建议改进:

  • 可以考虑添加日志轮转
  • 可以添加日志级别配置选项

3. 安全性建议

  • 建议使用更安全的方式存储敏感信息(如 API keys)
  • 建议添加 SSL 证书验证
  • 建议添加请求速率限制

4. 性能建议

  • 考虑添加并发处理
  • 可以实现增量更新机制
  • 考虑添加缓存机制

5. 可维护性建议

  • 添加单元测试
  • 添加更详细的文档
  • 考虑添加监控指标
  • 添加 CI/CD 配置

总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。

具体脚本

1. azure_log_downloader.py:

from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []

2. elasticsearch_sender.py:

from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count

3. log_config.py:

import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger

4. models.py:

from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")

5. main.py:

from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()

6. .env :

ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/5371.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka-常见的问题解答

讲一讲分布式消息中间件 问题 什么是分布式消息中间件&#xff1f;消息中间件的作用是什么&#xff1f;消息中间件的使用场景是什么&#xff1f;消息中间件选型&#xff1f; 分布式消息是一种通信机制&#xff0c;和 RPC、HTTP、RMI 等不一样&#xff0c;消息中间件采用分布式…

Android系统开发(六):从Linux到Android:模块化开发,GKI内核的硬核科普

引言&#xff1a; 今天我们聊聊Android生态中最“硬核”的话题&#xff1a;通用内核镜像&#xff08;GKI&#xff09;与内核模块接口&#xff08;KMI&#xff09;。这是内核碎片化终结者的秘密武器&#xff0c;解决了内核和供应商模块之间无尽的兼容性问题。为什么重要&#x…

数据结构-二叉树

树的相关概念&#xff1a; 1、节点的度&#xff1a;树中一个节点的孩子个数称为该节点的度&#xff0c; 所有节点的度的最大值是树的度 2、分支节点&#xff1a;度大于0的节点称为分支节点 3、叶子结点&#xff1a;度为0的节点称为叶子结点 4、节点的层次&#xff08;深度&…

他把智能科技引入现代农业领域

江苏田倍丰农业科技有限公司&#xff08;以下简称“田倍丰”&#xff09;是一家专注于粮油种植的农业科技公司&#xff0c;为拥有300亩以上田地的大户提供全面的解决方案。田倍丰通过与当地政府合作&#xff0c;将土地承包给大户&#xff0c;并提供农资和技术&#xff0c;实现利…

python进程池、线程池

Python广为使用的并发处理库futures使用入门与内部原理_concurrent.futures-CSDN博客 ThreadPoolExecutor(max_workers1) 池中至多创建max_workers个线程的池来同时异步执行&#xff0c;返回Executor实例、支持上下文&#xff0c;进入时返回自己&#xff0c;退出时调用 submit(…

51c~SLAM~合集1

我自己的原文哦~ https://blog.51cto.com/whaosoft/12327374 #GSLAM 自动驾驶相关~~~ 一个通用的SLAM架构和基准 GSLAM&#xff1a;A General SLAM Framework and Benchmark 开源代码&#xff1a;https://github.com/zdzhaoyong/GSLAM SLAM技术最近取得了许多成功&am…

Node.js 完全教程:从入门到精通

Node.js 完全教程&#xff1a;从入门到精通 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境&#xff0c;允许开发者在服务器端使用 JavaScript。它的非阻塞 I/O 和事件驱动架构使得 Node.js 非常适合于构建高性能的网络应用。本文将详细介绍 Node.js 的安装、基本语…

【JVM-9】Java性能调优利器:jmap工具使用指南与应用案例

在Java应用程序的性能调优和故障排查中&#xff0c;jmap&#xff08;Java Memory Map&#xff09;是一个不可或缺的工具。它可以帮助开发者分析Java堆内存的使用情况&#xff0c;生成堆转储文件&#xff08;Heap Dump&#xff09;&#xff0c;并查看内存中的对象分布。无论是内…

(二叉树)

我们今天就开始引进一个新的数据结构了&#xff1a;我们所熟知的&#xff1a;二叉树&#xff1b; 但是我们在引进二叉树之前我们先了解一下树&#xff1b; 树 树的概念和结构&#xff1a; 树是⼀种⾮线性的数据结构&#xff0c;它是由 n &#xff08; n>0 &#xff09; …

电脑如何访问手机文件?

手机和电脑已经深深融入了我们的日常生活&#xff0c;无时无刻不在为我们提供服务。除了电脑远程操控电脑外&#xff0c;我们还可以在电脑上轻松地访问Android或iPhone手机上的文件。那么&#xff0c;如何使用电脑远程访问手机上的文件呢&#xff1f; 如何使用电脑访问手机文件…

ABP - 缓存模块(1)

ABP - 缓存模块&#xff08;1&#xff09; 1. 与 .NET Core 缓存的关系和差异2. Abp 缓存的使用2.1 常规使用2.2 非字符串类型的 Key2.3 批量操作 3. 额外功能 1. 与 .NET Core 缓存的关系和差异 ABP 框架中的缓存系统核心包是 Volo.Abp.Caching &#xff0c;而对于分布式缓存…

【RAG落地利器】向量数据库Chroma入门教程

安装部署 官方有pip安装的方式&#xff0c;为了落地使用&#xff0c;我们还是采用Docker部署的方式&#xff0c;参考链接来自官方部署: https://cookbook.chromadb.dev/running/running-chroma/#docker-compose-cloned-repo 我们在命令终端运行&#xff1a; docker run -d --…

基于Python django的音乐用户偏好分析及可视化系统设计与实现

1.1 论文背景 随着信息技术的快速发展&#xff0c;在线音乐服务已成为日常生活的重要组成部分。QQ音乐&#xff0c;凭借其创新的音乐推荐算法和独特的社交特性&#xff0c;成功在竞争激烈的市场中获得一席之地。该平台的歌单文化和评论文化不仅满足了用户自尊和自我实现的需求…

以Python构建ONE FACE管理界面:从基础至进阶的实战探索

一、引言 1.1 研究背景与意义 在人工智能技术蓬勃发展的当下,面部识别技术凭借其独特优势,于安防、金融、智能终端等众多领域广泛应用。在安防领域,可助力监控系统精准识别潜在威胁人员,提升公共安全保障水平;金融行业中,实现刷脸支付、远程开户等便捷服务,优化用户体…

以单用户模式启动 Linux 的方法

注&#xff1a;本文为 “Linux 启动单用户模式” 相关文章合辑。 未整理去重。 以单用户模式启动 linux 的三种方法 作者&#xff1a; Magesh Maruthamuthu 译者&#xff1a; LCTT Xiaobin.Liu 2020-05-03 23:01 单用户模式&#xff0c;也被称为维护模式&#xff0c;超级用户…

【C++】size_t全面解析与深入拓展

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;一、什么是size_t&#xff1f;为什么需要size_t&#xff1f; &#x1f4af;二、size_t的特性与用途1. size_t是无符号类型示例&#xff1a; 2. size_t的跨平台适应性示例对…

YOLOv9改进,YOLOv9检测头融合RFAConv卷积,适合目标检测、分割任务

摘要 空间注意力已广泛应用于提升卷积神经网络(CNN)的性能,但它存在一定的局限性。作者提出了一个新的视角,认为空间注意力机制本质上解决了卷积核参数共享的问题。然而,空间注意力生成的注意力图信息对于大尺寸卷积核来说是不足够的。因此,提出了一种新型的注意力机制—…

Mysql触发器(学习自用)

一、介绍 二、触发器语法 注意&#xff1a;拿取新的数据时用new&#xff0c;旧数据用old。

即现软著工具 - 让软著申请更高效

在软件著作权申请的过程中&#xff0c;开发者常常会遇到代码整理、统计和生成证明文件等繁琐且复杂的任务。为了解决这些问题&#xff0c;提高申请效率和成功率&#xff0c;给大家介绍一款工具&#xff1a;即现软著工具。 即现软著工具&#xff0c;能够快速整理软著申请的程序鉴…

Java Web开发高级——单元测试与集成测试

测试是软件开发的重要环节&#xff0c;确保代码质量和功能的正确性。在Spring Boot项目中&#xff0c;单元测试和集成测试是常用的两种测试类型&#xff1a; 单元测试&#xff1a;测试单个模块&#xff08;如类或方法&#xff09;是否按预期工作。集成测试&#xff1a;测试多个…