我是java系的,会点儿go,因为项目的原因,开始接触python,赶鸭子上架,也接触了python不少内容,其中就包括多线程和多进程,我介绍和记录一些我的实践,方便自己的同时,希望能帮助到其它的小伙伴。
我先介绍一下我的应用场景,我需要用python解析大量的excel数据,20-30万行数据,然后构造一个请求的地址,用http调用,返回结果,连同原始数据一起生成excel。在生成excel的时候,为了保持原始数据顺序性,我会先把所有的请求结果用map存储在内存中,map的key为excel顺序号,value为url的结果。然后我会再次解析excel,从map中获取结果,进而生成excel。
为什么要研究多线程或者多进程呢,因为数据比较多,我需要用多进程或者多线程并发访问url。
在研究python多线程的时候,我发现因为GIL的原因,不管机器有多少个核,python只能用其中一个,如果想用上多个核的话,需要用python多进程。python多线程适合用做io密集型的任务,多进程适合用做cpu密集型的任务。我的场景是请求url,是一个io密集型的操作,按推荐来讲,我应该使用多线程,但我不确定GIL会在多大程度上影响我,在我遇到的业务场景中,也很少使用多进程的方式(除了跨主机多进程) 所以我打算用把多进程尝尝鲜。
在进程间数据共享的方式上,我选择用共享内存的方式(后来证明这是一个不太好的方式)。
我是这样使用多进程的
# 多进程核心代码# 读取excelexcel_data = pd.read_excel(path, sheet_name)# 共享内存smm = Manager()response_map = smm.dict()# 用来存储生成excel的数据data = {}# 并发调用http接口http_pool = multiprocessing.Pool(processes=2)for index, row in excel_data.iterrows():doc_id = row['index']address = row['address']http_pool.apply_async(process_task, args=(doc_id, address, response_map))http_pool.close()http_pool.join()# 多进程请求url
def process_task(doc_id, address, response_map):# 请求地址response = http_call(address)if doc_id not in response_map.keys():response_map[doc_id] = responseelse:pass
首先说多进程的好处
1. 进程之间天然是隔离的,不需要用锁来控制各个请求,我从来没发现请求有驴唇不对马嘴的情况。
其次就是多进程的坏处
1. 程序运行起来之后,我发现一个现象,随着时间的推移,程序运行的越来越慢。我仔细思考了一下,我觉得是因为进程间共享数据比较消耗资源,随着response_map中数据越来越多,程序肉眼可见的变慢了。
2. 程序长时间运行的时候报错MemoryError,但因为报错日志没有保存下来,具体错误原因不详,但我隐约感觉可能跟多进程共享大量数据有关系。
之后我果断转向多线程
我是这样使用多线程的
# 读取excel数据excel_data = pd.read_excel(path, sheet_name).astype(str)# 请求统计counter = {"total": 0,"repeat_num": 0,"empty_num": 0,"error_num": 0}# 并发调用http接口futures = {}response_map = {}with concurrent.futures.ThreadPoolExecutor(max_workers=process) as executor:for index, row in excel_data.iterrows():doc_id = row['index']address = row['address']# 发送请求future = executor.submit(process_task, doc_id, address, response_map, index, counter)futures[future] = doc_idfor future in concurrent.futures.as_completed(futures):pass
# 多线程执行代码
def process_task(doc_id, ec_addr, address, response_map, index, counter):try:with lock:counter['total'] += 1if ec_addr == '':counter['empty_num'] += 1returnif doc_id in response_map.keys():counter['repeat_num'] += 1returnresponse = http_call(address, index)with lock:response_map[doc_id] = responseexcept Exception:with lock:counter['error_num'] += 1
使用多线程之后,程序越运行越慢的问题解决了,MemoryError的错误在也没有见过。副作用是需要仔细lock好共享数据,不然会有请求和结果对不上的情况。
感谢python,让我对多进程以及进程之间通过内存共享数据有了直观的感受。