一、摘要
高性能,高并发的读取图片,并将图片传输到服务器的应用场景很多,比如上传图片到网站,将图片提交到后台推理等。这篇文章实现一种多线程并发方式将图片提交到后台。
二、多线程发送请求的实现方法
1. 使用ThreadPoolExecutor
线程池
通过线程池管理并发请求,避免手动创建/销毁线程的开销,且支持动态控制并发量。
from concurrent.futures import ThreadPoolExecutor
import requestsdef send_image_to_service(image_path, service_url):"""单张图片的请求发送逻辑"""try:with open(image_path, 'rb') as f:response = requests.post(service_url, files={'image': f})return response.status_codeexcept Exception as e:print(f"请求失败: {image_path}, 错误: {e}")return None# 改造后的多线程发送逻辑
def batch_send_images(image_paths, service_url, max_workers=10):with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = [executor.submit(send_image_to_service, path, service_url)for path in image_paths]# 可选:获取所有请求结果results = [future.result() for future in futures]return results
2. 使用asyncio
异步请求(更高性能)
对于高频请求场景(如每秒数百张图片),异步请求能进一步减少I/O等待时间:
import aiohttp
import asyncioasync def async_send_image(session, image_path, service_url):try:with open(image_path, 'rb') as f:async with session.post(service_url, data={'image': f}) as response:return await response.status()except Exception as e:print(f"异步请求失败: {image_path}, 错误: {e}")return Noneasync def async_batch_send(image_paths, service_url, max_concurrent=20):semaphore = asyncio.Semaphore(max_concurrent) # 限制并发数async with aiohttp.ClientSession() as session:tasks = []for path in image_paths:async with semaphore:task = asyncio.create_task(async_send_image(session, path, service_url))tasks.append(task)return await asyncio.gather(*tasks)
三、集成到原有代码中的示例
# 假设已生成所有图片路径:image_paths = ["frames/frame_0001.jpg", ...]
service_url = "http://your-service.com/upload"# 多线程发送请求(选择以下一种方式)
# 方式1:线程池
batch_send_images(image_paths, service_url, max_workers=10)# 方式2:异步请求(需在异步环境中运行)
asyncio.run(async_batch_send(image_paths, service_url, max_concurrent=20))
四、优化注意事项
-
并发数控制
• 根据服务端承载能力调整max_workers
或max_concurrent
,避免因过高并发导致服务崩溃。
• 建议通过压力测试确定最佳值(如从10逐步增加)。 -
错误处理与重试
• 在send_image_to_service
函数中增加重试机制(如retrying
库)。
• 记录失败请求的图片路径,便于后续补传。 -
性能监控
• 使用tqdm
库显示进度条(参考网页4的优化方法):from tqdm import tqdm with ThreadPoolExecutor(...) as executor:futures = [executor.submit(...) for path in image_paths]results = []for future in tqdm(futures, desc="发送进度"):results.append(future.result())
五、适用场景对比
方法 | 适用场景 | 性能优势 |
---|---|---|
ThreadPoolExecutor | 简单并发控制,兼容性高 | 中等,适合低频请求 |
asyncio | 高频请求(如每秒百次以上) | 高,资源占用更低 |
通过以上改造,您可以在不修改视频切片逻辑的前提下,将图片请求的吞吐量提升至原有单线程的10倍以上(具体取决于服务端响应速度)。若需进一步优化,可结合异步IO与连接池技术(如aiohttp
的持久化会话)。