一、前言
场景1:一个网络爬虫,顺序爬取一个网页花了一个小时,采用并发下载就减少到了20分钟。
场景2:一个应用软件优化前每次打开网页需要3秒,采用异步并发提升到了200毫秒。
假设一个工程的工作量为100,不采用并发编程就相当于由一个人去完成这个工作量为100的所有工作内容,可能需要1个小时来做完。
但是还是这工作量为100的工程,我们采用并发编程就相当于是由2个人或者3个人去共同完成这份100工作量的工作,可能这份工作只需要半个小时就能做完。
总之引入并发就相当于提升程序进行速度。
二、程序提速的方法
1、单线程串行
正常情况我们写出的程序是单线程执行的,比如一个线程执行开始后开始CPU【运算器和控制器】执行,之后进行IO操作,在IO完成之后CPU再次进行运算,也就是说在这个线程的执行过程中CPU与IO是不能同时工作的。
缺点:在整体时间上有些浪费,在CPU工作时,IO并不工作,如果他们能共同工作就好了。
2、多线程并发 【多线程】
此时情况为在CPU工作时,IO同样能进行。CPU进行工作,当运行到IO操作后,此时就会有一个新的task去执行IO操作,CPU也可以继续执行自己的运算,当IO完成后也会通知CPU进行下一步的处理。
注意:CPU运算与IO操作,这两个是可以同时并行执行的,也就是说在IO读取内存磁盘的时候,这个过程是不需要CPU参与的,CPU与IO可以同时处理自己工作,这相对于单线程串行来说就实现了并行的加速效果,更加合理的利用了时间资源。
3、多CPU并行 【多进程】
这种多核CPU多条线同时处理我们的程序,这种并行的方式才是真正的并发,多个CPU就相当于多个工人,工人多了效率自然就高了。
4、多机器并行
这种方法更加通俗易懂,多CPU并行相当于一个工厂里多个工人,而多机器并行相当于多个工厂,所以效率也就更高了。
三、程序、进程、线程的概念
1、基本概念
①程序:
人编写的,是静态的,没有生命周期,在磁盘上存放,由一系列指令和数据组成的文件,这些文件可以被操作系统加载到内存中并执行。
②进程:
进程就是程序的一次执行过程,一段程序的每一次运行都会产生一个或多个进程。
③线程:
是进程中的一个实体,是程序执行的一条分支,也是程序执行流的最小单元,被系统独立调度和分派。
2、区分:
①进程与程序:
进程是有生命期的,大部分会随着程序的运行而创建,随看程序的结束而终终止,也可以去手动结束进程。
在操作系统中,进程是操作系统进行资源分配和调度的基本单位。
每个进程都有自已的私有地址空间、执行堆栈、程序计数器、局部变量以及其他系统资源(如文件描述符、网络连接等)等。
通俗的说,一个正在运行的程序就是一个进程,比如QQ、微信等,但也有可能这个程序会生成多个进程。
②进程与线程:
线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,如调用栈、寄存器环境等,但它可与同属一个进程的其它线程共享进程所拥有的全部资源,如虚拟地址空间、文件描述符等。
③并发性:
线程之间可以并发执行,使系统具有更好的并发性,从而能更有效地使用系统资源和提高系统吞吐量。
④系统开销:
线程的创建、撤消和切换的开销远小于进程,因为线程不涉及存储管理方面的操作。
⑤主线程与子线程:
程序启动时,操作系统会创建一个主线程,主线程可以产生其它子线程,子线程启动后会和主线程一起同时执行。
四、Python对并发编程的支持
1、多线程:
threading模块,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴的等待IO完成。
2、多进程:
multiprocessing模块,利用多核CPU的能力,真正的并行执行任务。
3、异步IO:
asyncio模块,在单线程中利用CPU和IO同时执行的原理,实现函数异步执行。
4、Python还提供了辅助这些模块的函数:
- 多线程与多进程同时访问同一个文件同时写入的话就会冲突,可以使用Python提供的Lock来对资源进行加锁,防止冲突访问,就能实现顺序访问同一个文件。
- 使用Queue模块(队列)实现不同线程/进程之间的数据通信,实现生产者-消费者模式,比如实现一个生产者消费者模式来改造爬虫,生产者一边爬取,消费者就一边解析。
- 使用线程池/进程池,简化线程/进程的任务提交,等待结束,获取结果。
- 使用subprocess启动外部程序的进程,并进行输入输出交互。
五、Python怎样选择多线程,多进程与多协程
1、什么是CPU密集型计算,IO密集型计算
CPU密集型其实就是你程序的运行最终会受到CPU的限制,CPU是你程序运行的瓶颈,IO密集型则是你程序的运行最终会受到IO的限制,IO是你程序运行的瓶颈。
①CPU密集型(CPU-bound):
CPU密集型也叫计算密集型,是指IO操作在很短的时间内就能完成,CPU需要大量的计算和处理,而并不需要频繁的磁盘读写,网络传输等操作,特点是CPU占用率相当高。
例如:加密解密,图像处理,科学计算等需要大量的CPU运算能力以及较少的IO操作等任务。
并行优势:【多进程】CPU密集型任务可以通过并行在不同的处理器核心上同时执行提高性能。
②IO密集型(IO-bound):
IO密集型指的是系统运作大部分的状况是CPU在等IO(硬盘/内存)的读/写操作,CPU占用率低,不需要CPU进行大量的运算,CPU大部分时间通常是在等待IO操作的完成。
例如:文件处理程序,读写程序库程序,网络爬虫程序,用户输入等需要大量的IO操作而较少的CPU运算等任务。
并发优势:【多线程】IO密集型任务可以通过并发执行来提高效率,因为在等待一个IO操作完成时,CPU可以切换到另一个任务执行。
2、多线程,多进程与多协程的对比
①多进程 Process(multiprocessing)
优点:可以利用多核CPU并行运算
缺点:占用资源最多,可启动数目比线程少,受CPU的限制
适用:CPU密集型计算,你比如自己使用IO读取了数据,然后需要在CPU上运行大量的次数和时间这个时候就可以使用进程process来实现
②多线程 Thread(threading)
一个进程中可以启动N个线程
优点:相比进程,更加轻量级,占用资源少
缺点:相比进程:Python多线程只能并发执行,也就是说只能同时使用一个CPU,不能利用多CPU(因为一个叫GIL的东西)
相比协程:启动数目有限制,占用内存资源,有线程切换开销
适用:IO密集型计算,并且同时运行的任务数目要求不多
③多协程 Coroutine(asyncio)
在单线程内通过协作方式切换执行任务,实现函数异步执行。
优点:内存开销最小,启动协程数量最多
缺点:支持的库有限制,代码实现复杂
3、怎样根据任务选择对应技术
拿到我们的任务后,首先分析我们的任务特点:
①如果任务需要大量的CPU运算:
那他就属于我们的CPU密集型,此时我们采用多进程multiprocessing来实现。
②如果任务需要大量的IO操作:
那他就属于我们的IO密集型了,此时我们应该采用多线程threading来实现或者多协程asyncio来实现。
对于多线程与多协程我们应该如何选择呢,一般如果我们符合以下三点,我们就会使用多协程来实现,否则就会采用多线程来实现
这三点分别是:
①是否有超多任务量
②是否有线程的协程库来支持
③协程实现的复杂度是不是能够接受
如果这三点都能够满足,那么我们不妨尝试使用协程来实现我们的任务,因为我们的协程是一个新的技术,性能会最好。
六、Python多线程的使用
1、Python创建多线程的方法
import threading
import time# 定义两个线程要执行的函数
def task(name):for i in range(5):print(f"Task {name}: {i}")time.sleep(0.1) # 添加微小的延迟以确保线程交替# 创建线程
thread1 = threading.Thread(target=task, args=("A",))
thread2 = threading.Thread(target=task, args=("B",))# 启动线程
thread1.start()
thread2.start()# 等待线程完成
thread1.join()
thread2.join()print("All threads finished.")
输出结果:
总结:
①准备一个函数。
②使用 import threading 创建一个线程对象 t 。
③使用 t.start() 来启动线程。
④使用 t.join() 来等待线程的结束。
2、改写爬虫程序,变成多线程爬取
①我们写一个爬虫程序,爬取博客园的网页
import threading
import time
import requests# 定义需要爬取的 URL 列表
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50+1)]def craw(url):"""爬取单个 URL 的内容"""try:r = requests.get(url, timeout=0.1)print(f"{url}爬取成功, 长度: {len(r.text)}")except requests.RequestException as e:print(f"爬取失败{url}: {e}")craw(urls[0])
运行结果:
可以看到爬取了第一页内容的长度是72550个,这是我们的爬虫函数。
②接下来我们写我们的线程函数,首先写单线程的函数
def single_thread():"""单线程爬取"""print("单线程抓取已启动")for url in urls:craw(url)print("单线程抓取完成")
这是我们的单线程爬取的函数,单线程会顺序爬取网页,在线程开始之前有打印语句,在线程结束后我们同样写一行打印语句方便我们观察现象
③继续写我们的多线程函数
def multi_thread():"""多线程爬取"""print("已开始多线程抓取")threads = [threading.Thread(target=craw, args=(url,)) for url in urls]# 启动所有线程for thread in threads:thread.start()# 等待所有线程完成for thread in threads:thread.join()print("多线程爬取完成")
这是我们的多线程爬取函数,我们每个网页都开启一个线程来进行爬取,顺序是未知的,每个开始结束都有打印语句进行提醒
3、速度对比
在单线程与多线程中,爬取相同的内容的时间肯定是不同的,此时我们就可以使用time模块来进行计时,分别统计单线程爬取的时间与多线程爬取的时间进行对比
对应的函数如下:
if __name__ == "__main__":# 单线程测试print("开始单线程测试……")start_time = time.time()single_thread()end_time = time.time()print(f"单线程需要时间{end_time - start_time:.2f}秒\n")# 多线程测试print("开始多线程测试……")start_time = time.time()multi_thread()end_time = time.time()print(f"多线程需要时间{end_time - start_time:.2f}秒\n")
此时我们执行程序现象如下:
两者对比我们可以看出多线程比单线程时间快了几十倍!
七、Python实现生产者消费者模型多线程爬虫
1、多组件的Pipeline技术框架
Pipeline是什么?
对于我们来说,一个复杂的工作,我们一般都会把这一整个大型的工程划分为多个小工程,这样一步一步的完成我们的工作,如下图所示,像这样将一整个工作分步完成,降低整体的负责度。
我们输入数据,中间经过多个信息处理器(也就是处理模块)然后每个处理器输出中间数据,然后各个不同的处理器之间通过中间数据进行数据交互。
像这样将整个工程分多个模块来进行处理的这样的架构就叫做Pipeline,而每个处理器也有一个名字叫做Processor
我们的生产者消费者就是一个典型的Pipeline架构,上图中对于中间数据1来说,处理器1就是生产者,处理器2就是消费者 ,生产者有自己的原料也就是输入数据,而消费者也有自己的输出作为自己的输出数据。
2、生产者消费者爬虫的架构
Pipeline对应到Python爬虫的生产者消费者架构就是说里面有两个Processor,第一个Processor获取待爬取的URL进行网页的下载,他将下载好的内容放到一个下载好的网页队列。
其次消费者拿到这个网页队列然后去进行网页的解析,并且将解析的结果进行存储。
这样开发的好处就是生产者和消费者可以有两拨不同的人进行开发并且可以配置不同的资源。
那么这两个线程之间他们的中间数据是如何进行传递的,也就是多线程间的通信方式是什么呢?
3、多线程数据通信的queue.Queue
多线程间的通信方式可以使用queue.Queue来进行。
queue.Queue可以用于多线程之间的,线程安全的数据通信(线程安全指的是,多个线程并发同时的访问数据并不会出现冲突)。
那么queue.Queue是如何使用的呢
1.导入类库
import queue
2.创建Queue
q = queue.Queue()
3.添加元素
q.put(item)
4.获取元素
item = q.get()
5.查询状态
q.qsize() 查看元素的多少
q.empty() 查看元素的多少
q.full() 判断是否为满
4、代码编写实现生产者消费者爬虫
我们继续使用上节课的代码来进行,首先改写我们的craw函数,让函数返回获取到的url文本,如下图:
import threading
import time
import requestsurls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50+1)]
# 生产者
def craw(url):r = requests.get(url)return r.text
此时得到文档内容后,我们应该如何解析呢,让我们查看一下这个网页,网页中包含很多的文章标题,我们可不可以将这个标题给提取下来呢?
我们可以在标题的位置右键点击,然后选择检查,查看一下标题对应的标签等信息,如下图
信息包含一个a标签,也就是链接,他的class是‘post-item-title’内部还包含他的url以及标题信息,在这里我们就想要解析一下,然后得到每一篇文章的标题和连接。
好的,继续编写我们的代码, 我们重新编写一个名为parse的函数,他的参数也就是已经下载好的html,那么我们应该如何解析这个内容呢,这里使用了一个Beautifulsoup,BeautifulSoup是一个可以从HTML或XML文件中提取数据的Python库。【安装 beautifulsoup4 版本4.12】
html.parser是python标准库中的解析器,作用是将 HTML 文档解析为一个 BeautifulSoup 对象。"html.parser" 是Python标准库中的一个简单、快速的HTML解析器,适用于大多数一般情况。
soup.find_all则是返回所有匹配到的结果,这里我们匹配的是a标签 class则是标题对应的class。
最后我们返回一个元组,内容是提取的每个链接的 href 属性和文本内容。
# 消费者
def parse(html):soup = BeautifulSoup(html, "html.parser")links = soup.find_all("a", class_="post-item-title")return [(link["href"], link.get_text()) for link in links]
此时两个函数就已经写好了,craw就是我们的生产者,他的生产结果是每个html,而parse就是我们的消费者,他的工作是将每个html解析出来,并将解析到的链接和标题组成一个元组然后返回。
此时我们测试一下这两个函数,代码如下所示。
# 调用
if __name__ == '__main__':for result in parse(craw(urls[2])):print(result)
结果如下:打印出来网页中的所有标题以及连接。
那么接下来,我们继续编写我们Python多线程的生产者消费者爬虫。
首先我们把刚才打印网页中标题以及链接的文件,修改为blog_spider.py,作为我们的基础爬虫模块,然后新建一个文件,调用blog_spider.py。
在新建文件中,写两个函数,第一个是do_craw,这就是一个Processor,也就是我们的生产者,他的两个参数就是他的输入队列以及输出队列,我们对参数标明一下参数的类型方便我们的方法的提示。
函数开始运行以后,我们获取到url之后,调用blog_spider.craw获取html,然后把下载到的html文本放到html_queue中。
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):while True:url = url_queue.get()html = blog_spider.craw(url)html_queue.put(html)# 打印相关日志print(threading.current_thread().name, f"craw {url}",\"url_queue.size=", url_queue.qsize())time.sleep(random.randint(1, 2))
其次我们编写我们的消费者,名称为do_parse 他的两个参数,第一个是生产者的输出队列,也就是html.Queue,另一个我们写一个文件对象,将我们的结果保存到文件当中。
我们调用blog_spider.parse解析html_queue中的html文本,然后我们将解析之后获取到的文章链接和标题,存入文档中。
def do_parse(html_queue: queue.Queue, fout):while True:html = html_queue.get()results = blog_spider.parse(html)for result in results:fout.write(str(result) + "\n")# 打印当前线程的名字print(threading.current_thread().name, f"results.size", len(results),\"html_queue_size=", html_queue.qsize())time.sleep(random.randint(1, 2))
注意:
添加time.sleep的作用: 使当前线程暂停执行指定的时间,单位是秒。代码中time.sleep(random.randint(1, 2))是让当前线程随机休眠1到2秒的时间。
①避免过于频繁的请求:
通过在每次请求之后添加一定的延迟,可以减轻对目标服务器的负担。频繁而快速的请求可能会导致服务器认为是恶意攻击或滥用,从而被封禁或限制访问。
②模拟人类操作:
这种随机的休眠时间有助于模拟真实用户的行为,而不是产生机械的、规律性的请求。模拟用户行为可以降低被目标网站检测到爬虫的概率。
③控制请求速率:
控制请求速率是爬虫礼仪的一部分。不仅是为了保护目标服务器,也是为了避免对网络流量造成过大的影响,尊重网站的服务策略。
ok,至此,我们这两个生产者消费者函数已经写好了,那么我们如何通过多线程把他给调动起来呢,
第一:两个队列:url.queue和html.queue,并且从url.queue中获取url。
第二:启动我们的生产者线程了,这里我们启动三个生产者线程,线程的target就是我们的do_craw, args也就是参数有两个,分别是url.queue,html.queue, 同时我们给这个线程起一个名字,分别是craw加一个数字,这是三个生产者线程。
第三:两个消费者线程,消费者线程的target为do_parse,args是html.queue,和一个文件输出对象fout,对于fout我们可以创建出一个文件来,用于保存我们的输出数据, 同时我们给这个线程起一个名字,分别是parse加一个数字,这是两个消费者线程。
# 现在准备开始开启生产者和消费者线程
if __name__ == '__main__':url_queue = queue.Queue()html_queue = queue.Queue()for url in blog_spider.urls:url_queue.put(url)# 开启生产者线程for idx in range(3):t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")t.start()# 开启消费者线程和创建txt文件fout = open("spider_data.txt", "w")for idx in range(2):t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")t.start()
此时执行我们的函数可以清晰的看到函数执行的过程,并且可以看到生产者线程要比消费者线程结束的要早,因为生成者我们创建了3个线程,而消费者我们创建了两个线程。
此时打开我们的spider_data.txt文件
打开后你会发现,里面呢就是我们爬取好的一行一行的数据,每一行都是一个url以及对应的标题
至此,你会发现,对于一个复杂的爬虫,我们可以分很多的模块,每一个模块都可以来让多个线程来进行处理,而他们之间是使用queue.Queue进行来进行交互的,通过queue.Queue,我们的主线程把数据放进去,然后把这个数据传给我们的生产者,我们生产者会产出中间的数据,而我们的消费者线程又对这个中间数据进行处理得到最终的产出,到此执行结束。
代码总展示
import requests
from bs4 import BeautifulSoupurls = [f"https://www.cnblogs.com/#p{i}" for i in range(1, 50+1)]# 生产者——下载网页内容
def craw(url):r = requests.get(url)return r.text# 消费者——拿着生产者下载好的网页内容,进行解析
def parse(html):soup = BeautifulSoup(html, "html.parser") # 指定为html解析器# 提取出所有class名为post-item-title 的 超链接 a标签links = soup.find_all("a", class_="post-item-title")# 遍历links元组,将超链接a标签的href链接内容 和 标签文字内容提取输出return [(link["href"], link.get_text()) for link in links]# 调用
if __name__ == '__main__':for result in parse(craw(urls[2])):print(result)
这是写好的简单爬虫——生产者 消费者函数
import threading
import time
import random
import queue
import blog_spiderdef do_craw(url_queue: queue.Queue, html_queue: queue.Queue):while True:url = url_queue.get()html = blog_spider.craw(url)html_queue.put(html)# 打印相关日志print(threading.current_thread().name, f"craw {url}","url_queue.size=", url_queue.qsize())time.sleep(random.randint(1, 2))def do_parse(html_queue: queue.Queue, fout):while True:# 从网页内容队列中 获取队头元素html = html_queue.get()results = blog_spider.parse(html)for result in results:fout.write(str(result) + "\n")# 打印当前线程的名字print(threading.current_thread().name, f"results.size", len(results),"html_queue_size=", html_queue.qsize())time.sleep(random.randint(1, 2))# 现在准备开始开启生产者和消费者线程
if __name__ == '__main__':url_queue = queue.Queue()html_queue = queue.Queue()for url in blog_spider.urls:url_queue.put(url)# 开启生产者线程for idx in range(3):t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")t.start()# 开启消费者线程和创建txt文件fout = open("spider_data.txt", "w")for idx in range(2):t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")t.start()
这是导入了上面的模块,写的运行函数,运行这个就可以了【用到了queue队列,队列+线程+requests 一边下载网页一边解析内容】