Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

    • 1. 异步编程基础
      • 1.1 `asyncio`和`await`
      • 1.2 `aiofiles`
    • 2. 异步文件批量写入示例
      • 2.1 代码结构
      • 2.2 代码实现
      • 2.3 代码解释
        • 2.3.1 `BatchWriter`类
        • 2.3.2 `main`函数
    • 3. 其他示例代码
      • 3.1 简单的异步文件写入
      • 3.2 异步文件读取
      • 3.3 使用`asyncio.Lock`保护共享资源
      • 3.4 使用`asyncio.wait_for`设置超时
    • 4. 总结

在现代编程中,异步编程已经成为提高程序性能的重要手段之一。特别是在处理I/O密集型任务时,异步编程可以显著提高程序的效率。Python的asyncio库和第三方库aiofiles为我们提供了强大的工具,使得异步文件操作变得简单而高效。

本文将通过一个具体的示例,介绍如何使用asyncioaiofiles进行异步文件批量写入,并详细解释其中的关键技术点。

1. 异步编程基础

1.1 asyncioawait

asyncio是Python的标准库,用于编写异步代码。async关键字用于定义一个异步函数,而await关键字用于等待一个协程(coroutine)完成。

import asyncioasync def hello_world():print("Hello")await asyncio.sleep(1)print("World")asyncio.run(hello_world())

1.2 aiofiles

aiofiles是一个第三方库,提供了异步文件操作的功能。通过aiofiles.open可以异步打开文件,并通过await f.write进行异步写入。

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

2. 异步文件批量写入示例

2.1 代码结构

我们将实现一个BatchWriter类,用于批量写入数据到文件中。该类的主要功能包括:

  • 将数据添加到缓冲区。
  • 当缓冲区达到一定大小时,将缓冲区中的数据批量写入文件。
  • 使用asyncio.Lock保护共享资源,避免竞态条件。
  • 使用asyncio.wait_for设置写入操作的超时时间。

2.2 代码实现

import asyncio
import json
import time
from collections import deque
import aiofilesclass BatchWriter:def __init__(self, filename: str, batch_size: int = 100):self.filename = filenameself.batch_size = batch_sizeself.buffer = deque()self.lock = asyncio.Lock()async def add(self, data: dict):print(f"Adding data: {data}")async with self.lock:print("Lock acquired in add")self.buffer.append(data)print(f"Buffer size after adding: {len(self.buffer)}")should_flush = len(self.buffer) >= self.batch_sizeprint("Releasing lock in add")if should_flush:print("Buffer size exceeded batch_size, calling flush")await self.flush()async def flush(self):print("Starting flush")buffer_to_write = []async with self.lock:if not self.buffer:print("Buffer is empty, exiting flush")return# 将缓冲区内容复制到临时列表并清空缓冲区buffer_to_write = list(self.buffer)self.buffer.clear()print(f"Flushing buffer of size: {len(buffer_to_write)}")# 在锁外执行文件写入if buffer_to_write:try:print(f"Writing {len(buffer_to_write)} items to file")await asyncio.wait_for(self._write_to_file(buffer_to_write), timeout=10)print("Finished writing to file")except asyncio.TimeoutError:print("Timeout while writing to file")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))except Exception as e:print(f"Error writing to file: {e}")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))print("Flush complete")async def _write_to_file(self, buffer_to_write):async with aiofiles.open(self.filename, 'a') as f:for data in buffer_to_write:await f.write(json.dumps(data, ensure_ascii=False) + '\n')async def main():start_time = time.time()writer = BatchWriter(filename="output.jsonl", batch_size=5)async def generate_data():for i in range(10):data = {"id": i, "value": f"data_{i}"}print(f"Generating data: {data}")await writer.add(data)await asyncio.sleep(0.1)print("Starting data generation")await generate_data()print("Finished data generation")print("Calling final flush")await writer.flush()  # 最终刷新缓冲区end_time = time.time()print(f"Total time: {end_time - start_time} seconds")if __name__ == "__main__":asyncio.run(main())

2.3 代码解释

2.3.1 BatchWriter
  • __init__方法:初始化文件名、批量大小、缓冲区和锁。
  • add方法:将数据添加到缓冲区,并在缓冲区达到批量大小时调用flush方法。
  • flush方法:将缓冲区中的数据批量写入文件。如果写入失败(如超时),将数据重新放回缓冲区。
  • _write_to_file方法:异步写入数据到文件。
2.3.2 main函数
  • generate_data协程:生成数据并调用add方法将数据添加到缓冲区。
  • main函数:启动数据生成,并在数据生成完成后调用flush方法进行最终的缓冲区刷新。

3. 其他示例代码

3.1 简单的异步文件写入

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

3.2 异步文件读取

import asyncio
import aiofilesasync def read_from_file(filename):async with aiofiles.open(filename, 'r') as f:content = await f.read()return contentasync def main():content = await read_from_file('example.txt')print(f"File content: {content}")asyncio.run(main())

3.3 使用asyncio.Lock保护共享资源

import asyncio
import aiofilesclass FileWriter:def __init__(self, filename):self.filename = filenameself.lock = asyncio.Lock()async def write(self, content):async with self.lock:async with aiofiles.open(self.filename, 'a') as f:await f.write(content + '\n')async def main():writer = FileWriter('example.txt')async def write_data(data):await writer.write(data)await asyncio.gather(write_data('Data 1'),write_data('Data 2'),write_data('Data 3'))print("All data written to file")asyncio.run(main())

3.4 使用asyncio.wait_for设置超时

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():try:await asyncio.wait_for(write_to_file('example.txt', 'Hello, World!'), timeout=1)print("Data written to file")except asyncio.TimeoutError:print("Operation timed out")asyncio.run(main())

4. 总结

通过本文的介绍和示例代码,我们了解了如何使用asyncioaiofiles进行异步文件操作。异步编程可以显著提高I/O密集型任务的效率,特别是在处理大量并发任务时。使用asyncio.Lock可以保护共享资源,避免竞态条件。使用asyncio.wait_for可以设置操作的超时时间,防止无限期等待。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/455989.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sql-labs靶场第二十一关测试报告

目录 一、测试环境 1、系统环境 2、使用工具/软件 二、测试目的 三、操作过程 1、寻找注入点 2、注入数据库 ①寻找注入方法 ②爆库,查看数据库名称 ③爆表,查看security库的所有表 ④爆列,查看users表的所有列 ⑤成功获取用户名…

软件设计师:软件工程

文章目录 一、开发模型(1)瀑布模型(需求明确)(2)增量模型(快速构建)(3)演化模型(迭代模型)(3.1)原型模型&…

基于KV260的基础视频链路通路(MIPI+Demosaic+VDMA)

目录 1. 简介 1.1 要点 1.2 背景 1.2.1 Got stuck 1.2.2 Cant be Initialized 2. Overlay 2.1 参考 Overlay 2.1.1 KV260 Base 2.1.2 Pynq-CV-OV5640 2.2 自建 Overlay 2.2.1 IIC IP 2.2.2 MIPI CSI-2 Rx 2.2.3 AXI4-S Subset 2.2.4 Demosaic 2.2.5 Pixel Pack …

Pandas模块之垂直或水平交错条形图

目录 df.plot() 函数Pandas模块之垂直条形图Pandas模块之水平交错条形图 df.plot() 函数 df.plot() 是 Pandas 中的一个函数,用于绘制数据框中的数据。它是基于 Matplotlib 库构建的,可以轻松地创建各种类型的图表,包括折线图、柱状图、散点…

html----图片按钮,商品展示

源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>图标</title><style>.box{width:…

中国建设银行广东省分行珠海市分行营业网点装修工程采购项目市场调研供应商征集公告

中国建设银行广东省分行珠海市分行营业网点装修工程采购项目市场调研 供应商征集公告 根据业务发展需要&#xff0c;中国建设银行广东省分行现对珠海市分行2025-2026年度网点装修工程采购项目进行供应商市场调研&#xff0c;有关事宜公告如下&#xff1a;

【案例演示】图像描述大模型示例及概念解释

【案例演示】图像描述大模型示例及概念解释 一、案例演示模型描述期望模型使用方式以及适用范围模型功能演示 二、大模型开源平台概览模型库的定义大模型开源平台 一、案例演示 模型链接&#xff1a;https://modelscope.cn/models/iic/mplug_image-captioning_coco_base_zh 模…

XML\XXE漏洞基本原理

前言 欢迎来到我的博客 个人主页:北岭敲键盘的荒漠猫-CSDN博客 本文整理XXE漏洞的相应信息 XML与XXE漏洞 这个东西有许多叫法&#xff0c;XML漏洞与XXE漏洞差不多都是一个东西。 这个漏洞是出现在XMl上的&#xff0c;然后可以叫他XXE注入漏洞。 XML简介 XML是一种数据的传输…

WISE:重新思考大语言模型的终身模型编辑与知识记忆机制

论文地址&#xff1a;https://arxiv.org/abs/2405.14768https://arxiv.org/abs/2405.14768 1. 概述 随着世界知识的不断变化&#xff0c;大语言模型&#xff08;LLMs&#xff09;需要及时更新&#xff0c;纠正其生成的虚假信息或错误响应。这种持续的知识更新被称为终身模型编…

npm install 安装很慢怎么办?

安装源管理器nrm sudo npm install -g nrm #macOSnpm install -g nrm #Windows以管理员身份运行 安装完毕之后通过以下命令可以切换你想要的源 nrm ls #查看源列表* npm ---------- https://registry.npmjs.org/yarn --------- https://registry.yarnpkg.com/tencent ------…

FPGA第 13 篇,使用 Xilinx Vivado 创建项目,点亮 LED 灯,Vivado 的基本使用(点亮ZYNQ-7010开发板的LED灯)

前言 在FPGA设计中&#xff0c;Xilinx Vivado软件是一款功能强大的设计工具&#xff0c;它不仅支持硬件描述语言&#xff08;HDL&#xff09;的开发&#xff0c;还提供了丰富的图形化设计界面&#xff0c;方便用户进行硬件设计、调试和测试。这里我们将详细介绍&#xff0c;如…

操作系统Linux指令

1.注册表文件是Windows操作系统中的一种特殊文件&#xff0c;主要用于存储系统设置和用户配置信息。 这些文件通过REG文件扩展名进行标识&#xff0c;用户可以通过双击REG文件将其内容导入注册表中&#xff0c;从而对系统设置进行修改。 REG文件的特点是功能强大、灵活&#xf…

JAVA面试八股文(五)

#1024程序员节&#xff5c;征文# 在1024程序员节这个特别的日子里&#xff0c;首先&#xff0c;我想对每一位程序员表示最诚挚的祝贺&#xff01;祝愿大家在未来的日子里&#xff0c;能够继续热爱编程、追求卓越&#xff0c;携手共创更美好的科技未来&#xff01;让我们共同庆祝…

进程间通信(二)消息队列、共享内存、信号量

文章目录 进程间通信System V IPC概述System V IPC 对象的访问消息队列示例--使用消息队列实现进程间的通信 共享内存示例--使用共享内存实现父子进程间的通信&#xff08;进程同步&#xff09;示例--使用进程实现之前的ATM案例&#xff08;进程互斥&#xff09; 信号量示例--利…

Linux笔记---vim的使用

1. vim的基本概念 Vim是一款功能强大的文本编辑器&#xff0c;它起源于Unix系统的vi编辑器&#xff0c;并在其基础上进行了许多改进和增强。 Vim以其高效的键盘操作、高度的可定制性和强大的文本处理能力而闻名&#xff0c;尤其受程序员和系统管理员的欢迎。 Vim支持多种模式…

STM32之基本定时器TIM6和TIM7

1.定时器概念和作用 在编程任务中&#xff0c;定时器是非常常用的一个问题。当需要定时发送数据&#xff0c;定时起某个任务&#xff0c;定时做某个操作等等&#xff0c;这些都离不开定时器。本文基于以STM32F4xx系列开发板&#xff0c;介绍一下基本定时器。 2.基本定时器TIM…

基于Ubuntu24.04,下载并编译Android12系统源码 (二)

1. 前言 上篇文章&#xff0c;我们基于Ubuntu24.04&#xff0c;已经成功下载下来了Android12的源码&#xff0c;这篇文章我们会接着上文&#xff0c;基于Ubuntu24.04来编译Android源码。 2. 编译源码 2.1 了解源码编译的名词 Makefile &#xff1a; Android平台的一个编译系…

鸿蒙网络编程系列28-服务端证书锁定防范中间人攻击示例

1. TLS通讯中间人攻击及防范简介 TLS安全通讯的基础是基于对操作系统或者浏览器根证书的信任&#xff0c;如果CA证书签发机构被入侵&#xff0c;或者设备内置证书被篡改&#xff0c;都会导致TLS握手环节面临中间人攻击的风险。其实&#xff0c;这种风险被善意利用的情况还是很…

PHP企业门店订货通进销存系统小程序源码

订货通进销存系统&#xff0c;企业运营好帮手&#xff01; &#x1f4e6; 开篇&#xff1a;告别繁琐&#xff0c;企业运营新选择 嘿&#xff0c;各位企业主和创业者们&#xff01;今天我要给大家介绍一款超实用的企业运营神器——“订货通进销存系统”。在这个数字化时代&…

Docker入门之构建

Docker构建概述 Docker Build 实现了客户端-服务器架构&#xff0c;其中&#xff1a; 客户端&#xff1a;Buildx 是用于运行和管理构建的客户端和用户界面。服务器&#xff1a;BuildKit 是处理构建执行的服务器或构建器。 当您调用构建时&#xff0c;Buildx 客户端会向 Bui…