Python并发编程:从入门到实践 🚀
1. 多线程编程基础 🧵
多线程是实现并发的重要方式,Python提供了threading
模块来支持多线程编程。
1.1 基本线程操作
import threading
import time
from typing import List, Callableclass ThreadManager:"""线程管理器:用于创建和管理线程"""def __init__(self):self.threads: List[threading.Thread] = []def create_thread(self, target: Callable, args: tuple = ()) -> threading.Thread:"""创建新线程:param target: 线程执行的目标函数:param args: 传递给目标函数的参数:return: 创建的线程对象"""thread = threading.Thread(target=target, args=args)self.threads.append(thread)return threaddef start_all(self) -> None:"""启动所有线程"""for thread in self.threads:thread.start()def join_all(self) -> None:"""等待所有线程完成"""for thread in self.threads:thread.join()def thread_demo():def worker(name: str, sleep_time: int) -> None:"""线程工作函数"""print(f"线程 {name} 开始工作")time.sleep(sleep_time)print(f"线程 {name} 完成工作")# 创建线程管理器manager = ThreadManager()# 创建多个线程for i in range(3):manager.create_thread(target=worker, args=(f"Thread-{i}", i))# 启动并等待完成manager.start_all()manager.join_all()
1.2 线程同步机制
from threading import Lock, RLock, Condition, Event, Semaphore
from typing import Anyclass ThreadSafe:"""线程安全的资源访问演示"""def __init__(self):self._lock = Lock()self._rlock = RLock()self._condition = Condition()self._event = Event()self._semaphore = Semaphore(2)self._resource = 0def lock_example(self) -> None:"""使用Lock进行互斥访问"""with self._lock:self._resource += 1print(f"资源值: {self._resource}")def rlock_example(self) -> None:"""使用RLock进行可重入锁定"""with self._rlock:with self._rlock: # 可以重复获取锁self._resource += 1def condition_example(self) -> None:"""使用Condition进行条件同步"""with self._condition:# 等待条件满足self._condition.wait_for(lambda: self._resource > 5)print("条件满足,继续执行")def event_example(self) -> None:"""使用Event进行事件同步"""self._event.wait() # 等待事件print("事件被触发")def semaphore_example(self) -> None:"""使用Semaphore限制并发访问"""with self._semaphore:print("获得信号量")time.sleep(1)print("释放信号量")## 2. 多进程编程 🔄多进程适合CPU密集型任务,Python的`multiprocessing`模块提供了强大的多进程支持。```python
from multiprocessing import Process, Pool, Queue, Manager
import osclass ProcessManager:"""进程管理器:用于创建和管理进程"""@staticmethoddef process_worker(name: str) -> None:"""进程工作函数:param name: 进程名称"""print(f"进程 {name} (PID: {os.getpid()}) 开始工作")time.sleep(2)print(f"进程 {name} 完成工作")@staticmethoddef pool_example() -> None:"""进程池示例"""with Pool(processes=4) as pool:# 使用进程池并行处理任务results = pool.map(lambda x: x * x, range(10))print(f"进程池计算结果: {results}")@staticmethoddef queue_example() -> None:"""进程间通信示例"""def producer(queue: Queue) -> None:"""生产者进程"""for i in range(5):queue.put(f"item-{i}")time.sleep(1)def consumer(queue: Queue) -> None:"""消费者进程"""while True:item = queue.get()print(f"消费: {item}")# 创建队列和进程q = Queue()p1 = Process(target=producer, args=(q,))p2 = Process(target=consumer, args=(q,))# 启动进程p1.start()p2.start()# 等待生产者完成p1.join()## 3. 异步IO编程 🌊Python的`asyncio`模块提供了异步编程的支持,特别适合IO密集型任务。```python
import asyncio
from typing import List
import aiohttpclass AsyncIOManager:"""异步IO管理器"""@staticmethodasync def async_task(name: str) -> str:"""异步任务示例:param name: 任务名称:return: 任务结果"""print(f"任务 {name} 开始")await asyncio.sleep(1) # 模拟IO操作print(f"任务 {name} 完成")return f"Result-{name}"async def run_tasks(self) -> List[str]:"""运行多个异步任务:return: 任务结果列表"""tasks = [self.async_task(f"Task-{i}")for i in range(3)]results = await asyncio.gather(*tasks)return results## 4. 实战案例:并发下载器 🚀结合上述所有概念,实现一个高效的并发下载器。```python
import asyncio
import aiohttp
import os
from typing import List, Dict
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import threading@dataclass
class DownloadTask:"""下载任务数据类"""url: strfilename: strsize: int = 0downloaded: int = 0status: str = 'pending'class ConcurrentDownloader:"""并发下载器支持多线程、多进程和异步IO"""def __init__(self, save_dir: str = './downloads'):self.save_dir = save_dirself.tasks: Dict[str, DownloadTask] = {}self.lock = threading.Lock()os.makedirs(save_dir, exist_ok=True)async def async_download(self, url: str, filename: str) -> None:"""异步下载单个文件:param url: 下载URL:param filename: 保存的文件名"""task = DownloadTask(url=url, filename=filename)self.tasks[url] = tasktry:async with aiohttp.ClientSession() as session:async with session.get(url) as response:if response.status == 200:task.size = int(response.headers.get('content-length', 0))filepath = os.path.join(self.save_dir, filename)with open(filepath, 'wb') as f:while True:chunk = await response.content.read(8192)if not chunk:breakf.write(chunk)task.downloaded += len(chunk)task.status = 'completed'else:task.status = 'failed'except Exception as e:print(f"下载错误 {url}: {str(e)}")task.status = 'failed'async def batch_download(self, urls: List[Dict[str, str]]) -> None:"""批量下载文件:param urls: URL和文件名的字典列表"""tasks = [self.async_download(item['url'], item['filename'])for item in urls]await asyncio.gather(*tasks)def get_progress(self) -> Dict[str, Dict]:"""获取下载进度:return: 所有任务的进度信息"""with self.lock:return {url: {'filename': task.filename,'size': task.size,'downloaded': task.downloaded,'status': task.status,'progress': (task.downloaded / task.size * 100) if task.size > 0 else 0}for url, task in self.tasks.items()}# 使用示例
async def download_demo():"""下载器示例"""downloader = ConcurrentDownloader()# 准备下载任务urls = [{'url': 'http://example.com/file1.zip', 'filename': 'file1.zip'},{'url': 'http://example.com/file2.pdf', 'filename': 'file2.pdf'},]# 开始下载await downloader.batch_download(urls)# 获取进度progress = downloader.get_progress()print("下载进度:", progress)if __name__ == "__main__":# 运行异步下载示例asyncio.run(download_demo())
5. 最佳实践与注意事项 ⚠️
-
线程使用建议:
- GIL限制了Python多线程在CPU密集型任务上的表现
- 适合IO密集型任务
- 注意线程安全和死锁问题
-
进程使用建议:
- 适合CPU密集型任务
- 注意进程间通信的开销
- 合理使用进程池
-
异步IO使用建议:
- 适合大量IO操作场景
- 避免在协程中使用阻塞操作
- 使用异步库而不是同步库
-
性能优化:
- 根据任务特点选择合适的并发方式
- 避免过度并发
- 合理设置超时和重试机制
6. 拓展学习方向 🎯
- 深入研究Python的GIL机制
- 探索分布式计算框架
- 学习反应式编程
- 研究并发设计模式
- 了解更多异步框架(如Trio)
通过本文的学习,你已经掌握了Python并发编程的核心概念和实践技巧。继续探索和实践,你将能够构建更高效的并发应用!🐍✨
如果你觉得这篇文章有帮助,欢迎点赞转发,也期待在评论区看到你的想法和建议!👇
咱们下一期见!