昨天的并行思想实现了以下需求:
1、可以手动控制并行调度的数量
2、在并行调度数量内,一个子进程执行成功之后,另一个子进程开始执行
3、最后需要返回执行成功和执行没有成功的进程
现在引入一个新的思想--封装,我们先来看看什么是封装:
上面说到的是有一个子程序需要运行N次,如果有N个子程序需要运行N次呢?这个时候又该怎么处理呢?这个时候用到了封装思想,把这个程序封装起来,如果有多少个子程序就实例化多少个这个程序,此时我们可以向deepseek这样提问:帮我把这个程序封装成一个class类。
下面是源码,可以自行增加子程序进行测试:
from concurrent.futures import ThreadPoolExecutor, as_completed
import timeclass ParallelProcessor:'''并行任务处理器(带完整异常处理) '''def __init__(self, func,params_list,max_workers=4):'''初始化配置:parammax_workers: 最大并行线程数(建议设置为CPU核心数×2)'''self.max_workers = max_workersself.func = funcself.params_list = params_listself.success = [] # 存储成功结果 (参数, 结果)self.failure = [] # 存储失败结果 (参数, 错误信息)def run(self):"""并行调度器:param params_list: 参数列表,每个元素会传给p_w_time:param max_workers: 最大并行数量:return: (成功列表, 失败列表)"""with ThreadPoolExecutor(max_workers= self.max_workers) as executor:# 创建任务映射字典 {future: 参数}future_to_param = {executor.submit(self.func, param): param for param in self.params_list}# 按完成顺序处理结果for future in as_completed(future_to_param):param = future_to_param[future]try:result = future.result()self.success.append((param, result))except Exception as e:self.failure.append((param, str(e)))return self.success, self.failuredef get_results(self):'''获取处理结果统计 '''return {"total": len(self.success) + len(self.failure),"success_count": len(self.success),"failure_count": len(self.failure)}
'''子程序,可以编写多个'''
def p_w_time(passage):time.sleep(2)print(time.strftime("%Y-%m-%d %H:%M:%S")+' : '+str(passage)+' ')'''子程序,可以编写多个'''
def p_w_time1(passage):time.sleep(2)print(time.strftime("%Y-%m-%d %H:%M:%S")+' : '+str(passage)+' ')if __name__ == "__main__":#print(range(1, 11))# 初始化处理器(设置并行数为3)processor = ParallelProcessor(p_w_time,[1,2,'3',4,5,6,7],max_workers=3)#processor._task_handler(p_w_time)# 执行任务(参数范围1-10)processor.run()# 获取执行结果results = processor.get_results()print(f'总任务数:{results[ "total"]}')print(f'成功数:{results[ "success_count"]}')print(f'失败数:{results[ "failure_count"]}')processor1 = ParallelProcessor(p_w_time1, [1, 2, 4, 5, 6, 7], max_workers=3)# processor._task_handler(p_w_time)# 执行任务(参数范围1-10)processor1.run()# 获取执行结果results1 = processor1.get_results()print(f'总任务数:{results1["total"]}')print(f'成功数:{results1["success_count"]}')print(f'失败数:{results1["failure_count"]}')
「清华大学第四弹:...一样简单.pdf」,复制整段内容,打开最新版「夸克APP」即可获取。
畅享原画,免费5倍速播放,支持AI字幕和投屏,更有网盘TV版。
/~bea235zB7i~:/
链接:https://pan.quark.cn/s/a39e9d7285ce