多线程爬虫
文章目录
- 多线程爬虫
- 概念
- 并行
- 并发
- Python多线程用途
- threading模块
- 小知识----函数体内pass的用处
- 1. **占位符**
- 2. **控制结构**
- 3. **定义接口**
- 总结
- 代码解读
- 单线程--串行
- 多线程--并行
- 查看当前程序的线程
- 让主函数等待子线程结束,再运行---.join()
- join()方法的基本用法
- 1. **基本示例**
- 2. **超时参数**
- 小结
- 多线程join等待
- 使用继承类实现多线程
- 最终结果
- 多线程共享全局变量的问题
- 锁机制和threading.Lock类
- threading.Lock 锁
- 使用锁的原则:
- 示例,用十个线程,对a累加1000000次
- Condition锁
- Queue队列
- 生产者和消费者模式
- 生产者与消费者案例
- 解读代码
- 生产者子类
- 消费者子类
- 主函数启动线程
- Condition的生产者和消费者模式-案例
- 代码微动
- 遇到的问题
- threading.current_thread().name的作用
- 1. 调试和日志记录
- 2. 线程管理
- 3. 线程识别
- 4. 便于维护
- 总结
- run(self): 和run(self) -> None:的区别
- 区别:
- 影响:
- 示例:
- 总结
- 多线程案例——多线程爬取小说
- 注意事项
- 代码展示
- 引入的库
- 全局变量
- 生产者————产生URL
- 生产者兼消费者————下载URL里面的小说到本地
- 消费者——合并小说
- 主函数——函数入口
希望读到这篇文章的小伙伴可以跟我交流一下哪里有疑惑,大家需要互相成长,毕竟金无足赤,人无完人
概念
一般情况下,一个程序有一个进程和一个线程,代码依次线性执行
多线程可以并发执行,一次性多个人做多件事
并行
多件事同时刻发生,利用多个CPU,就像短跑一样,一组选手同时起步
并发
多件事同一时间段发生,利用一个CPU,就像接力赛一样,在一圈内有好几个人跑步,这里把一圈看成一个时间段
虽然总的运行时间没有减少,那也要分情况
上面的情况就是:CPU密集型
这里是因为都是CPU自己在工作,所以怎么切割都没用
对于下面的情况就有作用了
IO密集型
所谓的IO就是用户的输入和输出,不需要CPU的工作,比如,使用requests模块时,我们输入一个url,在得到服务器的返回值时,程序什么都不用执行
上面的线程1执行完之后,进行IO操作,CPU就被释放,不再占用CPU资源了
在执行A2操作时,B1也可以同时执行,然后把B1挂起,等待A2执行结束,执行B2,在执行C1,挂起C1,等待C2执行。。。可以缩短时间
Python多线程用途
Python自带的解释器是Cpython,并不支持真正意义上的多线程。Cpython提供了多线程包,包含一个叫Global Interpreter Lock(GIL)锁,它能确保你的代码中永远只有一个线程在执行。经过G的处理,会增加执行的开销。这就意味着如果你先要提高代码执行效率,使用treading不是一个明智的选择当然如果你的代码是IO密集型,比如爬虫,多线程可以明显提高效率,相反如果你的代码是CPU密集型,比如大量计算类型,这种情况下多线程反而没有优势,建议使用多进程。
threading模块
threading模块是python中专门提供用来做多线程编程的模块。threading模块中最常用的类是Thread。
- 用treading模块直接写一个多线程程序
- threading模块下的Thread类,继承自这个类,然后实现run方法,线程就会自动运行run方法中的代码
小知识----函数体内pass的用处
在 Python 中,pass
是一个空语句,用于表示"无操作"。它可以在需要某种语法结构但又不想执行任何操作的地方使用。常见的用法包括以下几种情况:
1. 占位符
有时在定义函数或类时,可能还未实现具体的逻辑。可以使用 pass
作为占位符,确保代码能够正常运行。
def my_function():pass # 暂未实现class MyClass:pass # 暂未实现的类
2. 控制结构
在条件语句、循环、异常处理等结构中,如果你暂时不想处理某些分支,也可以使用 pass
。
for i in range(5):if i == 2:pass # 你可以选择在这里执行某些操作,或者什么都不做else:print(i)try:# 可能会抛出异常的代码x = 1 / 0
except ZeroDivisionError:pass # 处理异常,但不做任何事情
3. 定义接口
在设计接口时,可能会有一些方法不需要立即实现。可以使用 pass
来定义这些方法。
class BaseClass:def method(self):pass
总结
pass
语句在编写 Python 代码时是非常有用的,它使得你的代码结构可以更容易地构建和调整,特别是在开发过程中。使用 pass
可以帮助你先搭建代码框架,然后再逐步实现具体功能。
代码解读
单线程–串行
import threading
import timedef singing(name,delay): # name填歌手,delay填延迟时间print(f'{name}开始唱歌')time.sleep(delay)print('结束唱歌')def dancing(name,delay): # name填舞者,delay填延迟时间print(f'{name}开始跳舞')time.sleep(delay)print('结束跳舞')def single_thread(): # 创建一个单线程singing('张学友',2)dancing('小狗',3)if __name__ == '__main__': # 写main关键字,作为函数入口# 计时strat_time = time.time()single_thread() # 调用单线程函数end_time = time.time()print(f'共消耗时间:{end_time-strat_time}')
多线程–并行
import threading
import timedef singing(name,delay): # name填歌手,delay填延迟时间print(f'{name}开始唱歌')time.sleep(delay)print('结束唱歌')def dancing(name,delay): # name填舞者,delay填延迟时间print(f'{name}开始跳舞')time.sleep(delay)print('结束跳舞')def multi_thread(): # 多线程# 线程1th1 = threading.Thread(target=singing,args=('张学友',2)) # 调用threading模块的Thread方法# 在第一个参数target后面写函数名,不要加括号!不要加括号!不要加括号!写了括号就不是指定函数了,就变成调用函数了# 第二个参数args后面以元组的形式,填入函数的参数,# args即使只有一个参数,也必须用元组的形式填入,必须有一个逗号(参数1,) 也可以是(参数1,参数2)# args要接收一个元组,只传一个参数,不加逗号,就相当于把参数传给args,不是元组就会报错th1.start() # 上面这一步只是建立一个对象,这行语句才是刚刚执行# 线程2th2 = threading.Thread(target=dancing,args=('小狗',3))th2.start()if __name__ == '__main__': # 写main关键字,作为函数入口# 计时strat_time = time.time()multi_thread() # 调用多线程end_time = time.time()print(f'共消耗时间:{end_time-strat_time}')
运行结果是:
张学友开始唱歌
小狗开始跳舞
共消耗时间:0.002039194107055664
结束唱歌
结束跳舞
很明显,不是我们想要的结果,因为在这里有三个线程同时执行
主线程main,子线程th1,子线程th2
主线程只是负责调用子线程,运行的很快,而子线程还在延迟中,我们现在查看的是主线程的运行时间
查看当前程序的线程
print(threading.enumerate())
使用这个程序可以看到程序运行到这个语句时的所有线程
if __name__ == '__main__': # 写main关键字,作为函数入口# 计时strat_time = time.time()multi_thread() # 调用多线程print(threading.enumerate())end_time = time.time()print(f'共消耗时间:{end_time-strat_time}')
加上这条语句之后,就可以看到程序的此刻的所有线程了
张学友开始唱歌
小狗开始跳舞
[<_MainThread(MainThread, started 22976)>, <Thread(Thread-1 (singing), started 2784)>,<Thread(Thread-2 (dancing), started 23204)>]
共消耗时间:0.0009999275207519531
结束唱歌
结束跳舞
不难发现,分别是主函数main,子线程singing,子线程dancing
让主函数等待子线程结束,再运行—.join()
def multi_thread(): # 多线程th1 = threading.Thread(target=singing,args=('张学友',2)) th1.start()th2 = threading.Thread(target=dancing,args=('小狗',3))th2.start()th1.join()th2.join()if __name__ == '__main__': # 写main关键字,作为函数入口# 计时strat_time = time.time()multi_thread() # 调用多线程,有join控制时,会一直等待join的子线程print(threading.enumerate())end_time = time.time()print(f'共消耗时间:{end_time-strat_time}')张学友开始唱歌
小狗开始跳舞
结束唱歌
结束跳舞
[<_MainThread(MainThread, started 22236)>]
共消耗时间:3.004053831100464
在 Python 的 threading
模块中,join()
方法用于等待线程完成。它的主要作用是阻塞调用这个方法的线程(通常是主线程),直到被调用的线程执行完毕。这样可以确保程序按预期顺序运行,特别是在多个线程之间有依赖关系时。
join()方法的基本用法
1. 基本示例
import threading
import timedef worker():print("线程正在运行")time.sleep(2)print("线程完成")# 创建线程
t = threading.Thread(target=worker)# 启动线程
t.start()# 主线程等待t线程完成
t.join()print("所有线程完成")
在这个示例中,主线程会等待 t
线程执行完毕,输出如下:
线程正在运行
线程完成
所有线程完成
2. 超时参数
join()
方法还可以接受一个可选的超时参数,表示等待的最大时间。如果在超时时间内线程未完成,主线程将继续执行。
import threading
import timedef worker():print("线程正在运行")time.sleep(5)print("线程完成")# 创建线程
t = threading.Thread(target=worker)# 启动线程
t.start()# 主线程等待t线程完成,最多等待2秒
t.join(timeout=2)if t.is_alive():print("线程还在运行,超时了!")
else:print("线程已完成")
在这个示例中,主线程等待 t
线程最多 2 秒。如果 t
线程还在运行,主线程会输出 “线程还在运行,超时了!”。
小结
- join() 方法阻塞调用线程,直到被调用线程完成。
- 可以使用可选的 timeout 参数设置最大等待时间。
- 在多线程编程中,使用
join()
可以确保线程之间的执行顺序和依赖关系。
多线程join等待
def multi_thread(): # 多线程# 创建空链表存线程名称task = []# 线程1th1 = threading.Thread(target=singing,args=('张学友',2))th1.start()task.append(th1)# 线程2for i in range(3):th2 = threading.Thread(target=dancing,args=('小狗',3))th2.start()task.append(th2)for t in task: # 让所有的线程使用joint.join()if __name__ == '__main__':# 计时strat_time = time.time()multi_thread()print(threading.enumerate())end_time = time.time()print(f'共消耗时间:{end_time-strat_time}')
首先创建一个空列表把这些线程放进去,再使用start函数,启动线程
最后,使用for循环,把列表内的所有线程挂上join,让主函数等待子线程完成
张学友开始唱歌
小狗开始跳舞
小狗开始跳舞
小狗开始跳舞
结束唱歌
结束跳舞
结束跳舞
结束跳舞
[<_MainThread(MainThread, started 15160)>]
共消耗时间:3.00308895111084
使用继承类实现多线程
import threading
import timeclass Singing(threading.Thread):def __init__(self,delay):super(Singing,self).__init__()self.delay = delaydef run(self):print(f'{threading.current_thread().name}开始唱歌')time.sleep(self.delay)print('结束唱歌')class Dancing(threading.Thread):def __init__(self,delay):super(Dancing,self).__init__()self.delay = delaydef run(self):print(f'{threading.current_thread().name}开始跳舞')time.sleep(self.delay)print('结束跳舞')def multi_thread():task = []th1 = Singing(3)th1.start()task.append(th1)for i in range(3):th2 = Dancing(4)th2.start()task.append(th2)for t in task:t.join()if __name__ == '__main__':start_time = time.time()multi_thread()end_time = time.time()print(f'总耗时:{end_time-start_time}')
这里使用继承类的方式,创建了两个子类Singing和Dancing
继承类的方式和原理,在我的爬虫系列一里面有讲解,不懂的可以去看看那篇文章
在子类里面,run函数是调用类的时候自动执行的,在这里我们希望自动执行的是开始唱歌开始跳舞
在run函数里面,我们还希望再加一些参数进去,实现一个可控的类对象,就需要用到init里面的super函数
init是一个初始化函数,我们在类的实例化时,需要先初始化类对象
这里的delay是我们希望控制的跳舞或者唱歌时间
使用super方法,在第一个参数写类名称,第二个参数写delay数据
因为是继承类,所以在调用Singing函数的时候,不需要再使用threading.Thread()方法了
最终结果
Thread-1开始唱歌
Thread-2开始跳舞
Thread-3开始跳舞
Thread-4开始跳舞
结束唱歌
结束跳舞
结束跳舞
结束跳舞
多线程共享全局变量的问题
问题: 多线程都是在同一个进程中运行的。因此在进程中的全局变量所有线程都是可共享的。这就造成了一个问题,因为线程执行的顺序是无序的,有可能会造成数据错误。
说人话,就是当我们创建一个全局变量的时候,在线程中调用了这个全局变量,又因为线程与线程之间没有办法相互联系,就可能导致数据出错
比如,一个全局变量a=1,在一个线程中,我将对他操作,让a=a+1,按理来说,a离开第一个线程时,a=2,然后其他线程再拿到a时,a应该为2,再传出来就是3
但是这些线程几乎同时拿到了a,又同时把a传出来,本来a应该为5,但是现在a还是2
所以,我们有了一个解决方法———锁机制
锁机制和threading.Lock类
threading.Lock 锁
为了解决共享全局变量的问题。treading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时就不能进来,直到当前线程处理完后,把锁释放了,其他线程才能进来处理。
使用锁的原则:
把尽量少的和不耗时的代码放到锁中执行。
代码执行完成后要记得释放锁。
就比如刚刚的对a的操作,我们把对a的操作放到锁里面,当第一个线程拿到a的时候,这个锁就被锁定了,只有第一个线程对a操作完毕,这个锁才打开,第二个线程才能继续拿a依次类推,就保障了全局变量a有序的操作
示例,用十个线程,对a累加1000000次
这里必须是比较大的累加次数,我的电脑num至少是一百万次,才会使a成为乱数,不是整齐的1000000,2000000
如果你把num设置成一万或者十万,发现数字都是整整齐齐的,因为你的计算机性能很好,当第一个线程完成累加的时候,其他线程还没反应过来,只有线程一在对a操作
import threadinga = 0
def add_value(num):global a # 声明全局变量,因为程序不能在函数体里找到for i in range(num):a = a + 1print(f'{threading.current_thread().name}的a是{a}')def multi_thread():for i in range(10):th1 = threading.Thread(target=add_value,args=(1000000,))th1.start()if __name__ == '__main__':multi_thread()运行结果:
Thread-1 (add_value)的a是1495479
Thread-2 (add_value)的a是2516817
Thread-3 (add_value)的a是3000000
Thread-5 (add_value)的a是5485545
Thread-4 (add_value)的a是5516129
Thread-6 (add_value)的a是6532192
Thread-7 (add_value)的a是7466099
Thread-8 (add_value)的a是8431574
Thread-9 (add_value)的a是9533725
Thread-10 (add_value)的a是10000000
这里第一个线程累加结束的时候,其他线程已经在对a进行操作了,这个结果不是我们想看到的,这就线程产生的干扰
使用threading下面的lock锁方法
lock = threading.Lock() # 引入threading模块下的Lock方法
import threading
lock = threading.Lock() # 引入threading模块下的Lock方法
a = 0
def add_value(num):global a # 声明全局变量,因为程序不能在函数体里找到lock.acquire()for i in range(num):a = a + 1lock.release()print(f'{threading.current_thread().name}的a是{a}')def multi_thread():for i in range(10):th1 = threading.Thread(target=add_value,args=(1000000,))th1.start()if __name__ == '__main__':multi_thread()
这里在原来的基础上,加了个锁
在累加之前有lock.acquice()
累加之后有释放锁lock.release()
打印结果:
Thread-1 (add_value)的a是1000000
Thread-2 (add_value)的a是2000000
Thread-3 (add_value)的a是3000000
Thread-4 (add_value)的a是4000000
Thread-5 (add_value)的a是5000000
Thread-6 (add_value)的a是6000000
Thread-7 (add_value)的a是7000000
Thread-8 (add_value)的a是8000000
Thread-9 (add_value)的a是9000000
Thread-10 (add_value)的a是10000000
Condition锁
Queue队列
生产者和消费者模式
生产者和消费者模式是多线程开发中经常见到的一种模式。生产者的线程专门用来生产一些数据,然后存放到一个中间的变量中。消费者再从这个中间的变量中取出数据进行消费。通过生产者和消费者模式,程序分工更加明确线程更加方便管理。
就是把程序分开,各自完成自己的工作
比如之前下载美女图片的案例,有一段代码专门获取链接,有一段代码专门拼接链接,剩下一个代码专门下载链接到本地
前两个函数就是生产者,后面那个下载的就是消费者了
生产者与消费者案例
import threading
import time
import random # 引入随机数库lock = threading.Lock()total_money = 0 # 总额
max_number = 10 # 最大生产次数/最大消费次数
Producer_numbers = 0 # 生产者工作次数
Consumer_numbers = 0 # 消费者工作次数class Producer(threading.Thread): # 生产钱def run(self):global total_money,Producer_numbers,max_numberwhile True: # 使用while循环,让这个线程一直工作,一直产出lock.acquire()if Producer_numbers >= max_number: # 当生产次数达到10次的时候线程结束print(f'{threading.current_thread().name}完成了生产工作***********************')lock.release() # 在离开线程之前记得把锁释放掉returnmoney = random.randint(500,1000) # 使用random库的randint产生整型数据,后面的参数是数据产生的范围500~1000total_money += moneyprint(f'{threading.current_thread().name}今天赚了{money}元')Producer_numbers += 1lock.release()time.sleep(0.5)class Consumer(threading.Thread): # 消费钱def run(self):global total_money, Consumer_numbers,Producer_numberswhile True: # 使用while循环,让这个线程一直工作,一直产出lock.acquire()if Consumer_numbers >= max_number or (Producer_numbers == 10 and total_money <= 500): # 当消费次数达到10次就停止print(f'消费者{threading.current_thread().name}完成了工作-------------------')lock.release() # 在离开线程之前记得把锁释放掉returnmoney = random.randint(500, 1000) # 使用random库的randint产生整型数据,后面的参数是数据产生的范围500~1000if money < total_money:total_money -= moneyprint(f'{threading.current_thread().name}今天消费了{money}元')Consumer_numbers += 1else:print(f'{threading.current_thread().name}想要消费{money}元,但是账户上只有{total_money}元')lock.release()time.sleep(0.5)def main():for i in range(5):th1 = Producer(name=f'生产者{i}号')th1.start()for i in range(5):th2 = Consumer(name=f'消费者{i}号')th2.start()if __name__ == '__main__':main()
这个代码就是模拟了一下生产者不停生产,把钱存到账户total money
这个账户就是一个存放数据的容器
然后就是消费者从容器里面拿数据出来使用
这个生产钱和消费钱的过程就被分成了2部,三个板块
解读代码
首先,我们创建了两个子类,继承的是threading模块的thread类,这样我们使用这两个类的时候就可以简化一些了,只需要些类对象的名称就可以创建多线程了
这两个子类,都对run函数进行了重写,并且使用且改变了全局变量,使用global声明了全局变量,还通过全局变量控制生产者和消费者工作的次数
使用while循环,让每个线程可以一直工作,停止工作的条件就是整体完成了工作,比如,产生了10次钱,还有消费了10次钱,或者说剩下的钱钱不够消费了,被迫停止了
生产者子类
思路:
我们就是想拿到钱,这里使用随机数,可以较好的模拟大多数情况
每次产出的钱要及时的存到账户里面去,让后面的消费者可以使用账户
这里为了代码展示,我们需要控制线程工作的次数,使用全局变量number控制
每个线程每工作一次,这个总次数就加一,账户余额也增加相应的钱钱
当总次数达到了标准,所有的线程都需要停止
所以,上面的这些步骤都需要上锁
这些步骤运行完之后才能解锁,保障了多线程的稳定性
这里面有个小坑,在控制次数的时候,使用“>=”而不是“>”
消费者子类
思路:
消费者需要从账户获取一定的钱钱去消费
这里就需要多一个条件,比如,账户的钱钱够不够,账户上有没有钱钱,生产者是否存在,消费者消费的次数是否超过了标准次数等待
所以if里面的条件是:要么消费的次数够了,要么账户上没钱的同时生产者也不工作了,彻底没法再次消费了,这个线程就需要结束了
当然,也是通过产生随机数,来消费账户
因为这里有了个if判断,这个判断也是线程结束的标准,我们在循环体里面的那个解锁,还需要在if判断里面再写一次,毕竟需要及时解锁
为了方便查看是哪一个线程在工作,还自定义了线程的名称
主函数启动线程
本次的主函数就是一个启动多线程的开关,没什么特殊的地方
Condition的生产者和消费者模式-案例
threading.Condition可以在没有数据的时候处于阻塞等待状态。一旦有合适的数据了,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样就可以不用做一些无用的上锁和解锁的操作。可以提高程序的性能。
常用函数如下:
acquire:上锁。
release: 解锁。
walt:将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify和notify_all函数唤醒。被唤醒后会继续等待上锁,上锁后继续执行下面的代码。
notify:通知某个正在等待的线程,默认是第1个等待的线程。 notify al:通知所有正在等待的线程。notify和notify all不会释放锁。并且需要在release之前调用
代码微动
import threading
import time
import random # 引入随机数库lock = threading.Condition() ###################total_money = 0 # 总额
max_number = 10 # 最大生产次数/最大消费次数
Producer_numbers = 0 # 生产者工作次数
Consumer_numbers = 0 # 消费者工作次数class Producer(threading.Thread): # 生产钱def run(self):global total_money,Producer_numbers,max_numberwhile True: # 使用while循环,让这个线程一直工作,一直产出lock.acquire()if Producer_numbers >= max_number: # 当生产次数达到10次的时候线程结束print(f'{threading.current_thread().name}完成了生产工作***********************')lock.release() # 在离开线程之前记得把锁释放掉returnmoney = random.randint(500,1000) # 使用random库的randint产生整型数据,后面的参数是数据产生的范围500~1000total_money += moneyprint(f'{threading.current_thread().name}今天赚了{money}元')Producer_numbers += 1lock.notify_all() ###################lock.release()time.sleep(0.5)class Consumer(threading.Thread): # 消费钱def run(self):global total_money, Consumer_numbers,Producer_numberswhile True: # 使用while循环,让这个线程一直工作,一直产出lock.acquire()money = random.randint(500, 1000)while total_money < money: # 如果余额不足,线程进入等待if Consumer_numbers >= max_number or Producer_numbers>=max_number: ###################print(f'消费者{threading.current_thread().name}完成了工作-------------------')lock.release()returnlock.wait() ###################total_money -= moneyConsumer_numbers += 1print(f'{threading.current_thread().name}今天消费了{money}元')lock.release()time.sleep(0.5)def main():for i in range(5):th1 = Producer(name=f'生产者{i}号')th1.start()for i in range(5):th2 = Consumer(name=f'消费者{i}号')th2.start()if __name__ == '__main__':main()
我把有改动的地方后面写了“###################”
-
一个是把锁改变一下,从threading.Lock()改成threading.Condition()
只有使用Condition才能使用notify和wait
-
一个是生产者,我们在生产者把钱打到账户的时候,就使用notify_all唤醒所有正在等待的线程
-
一个消费者,这里我们让消费者一直消费,在消费的时候判断余额够不够,余额不够就进入等待状态
当唤醒词打入的时候,再次判断钱够不够,如果还是不够,先判断是不是生产者或者消费者次数够了,如果是次数够了,也就是可以下班了,直接退出线程
如果余额足够,次数也没完,就把余额消费了,然后再次消费
使用了wait状态的时候,性能是要优于只有锁的时候,因为在wait状态的时候线程不占用CPU
遇到的问题
下面都是在写代码的时候产生的疑惑
threading.current_thread().name的作用
threading.current_thread().name 用于获取当前正在执行的线程的名称。这个功能在多线程程序中非常有用,主要有以下几个作用:
1. 调试和日志记录
当你在多线程环境中进行调试或记录日志时,知道当前线程的名称可以帮助你更好地了解程序的运行状态。例如,你可以在日志中打印当前线程的名称,以便追踪哪个线程在执行特定的任务。
import threadingdef worker():print(f'Worker thread name: {threading.current_thread().name}')# 创建线程
thread1 = threading.Thread(target=worker, name='Thread-1')
thread1.start()
thread1.join()
2. 线程管理
如果你的应用程序需要对特定线程进行管理或引用,线程名称可以用作唯一标识。例如,你可能会根据线程名称来决定是否终止或检查某个特定的线程。
3. 线程识别
在复杂的多线程应用程序中,特别是当多个线程执行不同的任务时,线程名称可以帮助你识别并理解线程的作用。你可以在任务开始时创建一个有意义的名称,这样在运行时更容易理解每个线程是在做什么。
4. 便于维护
使用线程名称使得代码更具可读性和可维护性。当其他开发者查看代码时,通过名称可以更容易了解各个线程的目的和功能。
总结
使用 threading.current_thread().name
可以使你的多线程程序更容易调试、管理和维护。它提供了对正在执行线程的更好了解,尤其是在处理复杂的并发任务时。
run(self): 和run(self) -> None:的区别
在Python中,函数的返回类型提示(type hint)是可选的,它提供了一种文档化函数行为的方式,但并不会影响函数的实际行为。具体来说,def run(self):
和 def run(self) -> None:
之间的区别如下:
区别:
-
返回类型提示:
def run(self):
表示这个方法没有明确的返回类型,Python会默认返回None
。def run(self) -> None:
明确指定了该方法的返回类型是None
。这是一种对其他开发者或静态类型检查工具的文档说明,表示这个函数不返回任何值。
-
不影响功能:
- 在实际的运行过程中,这两种定义的功能没有区别。即使没有返回类型提示,Python 也会假设这个函数返回的是
None
。
- 在实际的运行过程中,这两种定义的功能没有区别。即使没有返回类型提示,Python 也会假设这个函数返回的是
影响:
-
可读性:使用返回类型提示(如
-> None
)可以提高代码的可读性,使其他开发者能够快速了解这个函数的预期行为。这在较大的项目中尤其有用,可以帮助团队成员理解代码逻辑。 -
静态类型检查:如果你使用像
mypy
这样的静态类型检查工具,它们会利用这些类型提示来检查类型一致性。如果没有类型提示,工具可能会警告或忽略某些类型检查,因此在代码规范要求下,类型提示可以帮助及时捕获潜在错误。
示例:
class Singing(threading.Thread):def run(self): # 没有类型提示# 运行逻辑passdef run(self) -> None: # 有类型提示# 运行逻辑pass
总结
在你的run
方法中加上类型提示并不会影响它的行为,是否使用类型提示取决于你的编码风格和团队的规范。如果你希望提高代码的可读性和维护性,使用类型提示是一个好习惯,但不是必需的。
多线程案例——多线程爬取小说
- 生产者——————产生URL
- 消费者兼生产者——下载小说
- 消费者——————合并小说
- 主函数——————函数入口
注意事项
这里我们使用了队列queue来储存URL,需要提取导入一下队列,我们在主函数中让队列实例化,指定大小使用maxsize参数
代码展示
# 导入库
import re
import time
import requests
import threading
import queue
import os
from lxml import etreelock = threading.Lock()end_work = False # 判断生产者是否完成工作
novel_title = ''total_numbers = 0 # 给合并板块的线程一个参考的范围,这里是为了告诉它一共有多少个文件# 生产者————产生URL
def get_url(index_url,q):global novel_title,end_work,total_numbers# 从目录页获取总页数和第一页的网址格式rt = requests.get(url=index_url)# print(rt.text)html = etree.HTML(rt.text)# 创建文件夹,放小说novel_title = html.xpath('//div[@class="infotitle"]//h1//text()')[1] # 拿到小说名称if not os.path.exists(f'/{novel_title}'): # 创建小说文件夹os.mkdir(f'/{novel_title}')result1 = html.xpath('//div[@class="tag"]/a/@href') # 得到页数所在的标签Total_pages = re.findall('/read_(\\d+).html',result1[0])[0] # 获取标签里面的页数信息# Total_pages = 10 # 为了展示效果,这个Total_pages暂且设为10 ,把这条屏蔽就是全部小说了total_numbers = Total_pages # 把总页数传给合并线程# print(Total_pages)for link in range(1, int(Total_pages) + 1): # 因为是左闭右开,所以右边加一url = re.sub(f'/read_.*',f'/read_{link}.html',result1[0])q.put(url)# print(url)# 完成所有URL的制作print('生产者URL完成工作')end_work = True# 生产者兼消费者————下载URL里面的小说到本地
def download_novel(q):global novel_title,end_workwhile not end_work: # 生产者URL还没生成完毕time.sleep(0.5)while True:if q.empty() and end_work:returnlock.acquire()url = q.get()lock.release()pages = re.findall('/read_(\\d+).html', url)[0] # 获取小说的章节rt = requests.get(url=url)# print(rt.text)html = etree.HTML(rt.text)result2 = html.xpath('//div[@class="content"]//text()') # 得到小说文章result2 = "".join(result2) # 字符串拼接result2 = result2.replace('\u3000\u3000','\n') # 文本处理完毕# 写入文件title = html.xpath('//div[@class="title"]//a/text()')[0]with open(f'/{novel_title}/{pages}.text','w',encoding='utf-8') as f:f.write(str(title)+'\n' + result2 + '\n\n')print(f'{threading.current_thread().name}已下载……{title}')# 消费者——合并小说 # 其实在上面那个板块里面,把小说以追加的形式写在一个文件里面也能完成合并,这里是为了展示代码能力
def Combined_novel():global total_numbers,novel_title,end_workwhile not end_work:time.sleep(0.5)num = 1fp = open(f'/{novel_title}/{novel_title}.text','w',encoding='utf-8') # 以追加的方式创建合并小说集,如果文件不存在就会新建一个文件fp.close()fp = open(f'/{novel_title}/{novel_title}.text','a',encoding='utf-8')while True:if int(num) > int(total_numbers):breakif os.path.exists(f"/{novel_title}/{num}.text"):with open(f"/{novel_title}/{num}.text",'r',encoding='utf-8') as f:text = f.read()fp.write(text)num += 1print(f'已合并……章节{num-1}')os.remove(f"/{novel_title}/{num-1}.text")print(f'已删除……章节{num-1}')else:print(f'未发现……章节{num}')time.sleep(1) # 等待小说下载fp.close()print('已完成全部文件合并')# 函数入口
def main():q = queue.Queue(maxsize=2000) # 这里是创建了一个空队列,设置的大小是2000index_url ='https://www.80down.cc/novel/160651/' # 如果想下载别的小说,只需要修改这个地方就可以了# 当然,是这个网站里面的小说 ,其他网站的小说,可以按照这个格式爬取th1 = threading.Thread(target=get_url,args=(index_url,q,))th1.start()for i in range(4):th2 = threading.Thread(target=download_novel,args=(q,),name=f'生产者{i}号')th2.start()th3 = threading.Thread(target=Combined_novel)th3.start()if __name__ == '__main__':main()
引入的库
首先介绍引入的库,还有它在代码当中发挥的作用
- re 这个是正则表达式,负责从字符串中提取信息,在代码中提取了小说的总页数
- time 这个是时间模块,在程序中是为了等待其他的线程完成工作
- requests 这个是网络请求模块,可以模拟浏览器向网页发送请求
- threading 这个是多线程模块,有了这个模块才可以开启多线程,在主函数中应用
- queue 这是队列模块,负责存储和出取URL
- os 这个是系统模块,程序中负责创建文件夹,检测文件夹是否存在
- from lxml import etree 这个就是xPath语法的库,不懂的看我前几篇文章
- lock = threading.Lock() 这个是多线程上锁的模块,主要在下载小说的线程中应用
全局变量
- end_work 判断生产URL是否完成工作的变量,防止下载小说的线程太快造成程序崩溃,而且这个制作URL的线程很快
- novel_title 这个是储存小说书名的,在下载URL时创建文件夹使用的,还有下载小说时存临时文件用,以及合并文件的路径
- total_numbers 告知合并小说的线程,一共有多少个文件需要合并
生产者————产生URL
和之前一样,这种比较简易的网站在设计网址的时候都是只改最后面的数字充当页数,比如/read/289414_1.html
这个下划线后面的1就是第一页,根据这个规律,可以很轻松的模拟出其他的页面链接
所以,根据小说的目录页,获取第一页小说的网址,再获取小说的总页数,这个板块对网址的请求就没用了,剩下的就是设计链接了
在这里我随手就创建了存放小说的文件夹,毕竟目录页可以很轻松的获取小说的名称啦
result1就是获取总章节的页数存到Total_pages里面,这个就是全局变量第三个的来源
后面就是把设计好的链接放到队列里面去
生产者兼消费者————下载URL里面的小说到本地
这个板块引用了两个全局变量,一个是小说的名称用来下载小说时确认路径的
一个是上面那个生产者的结束判定,毕竟要把URL先存到队列里面去才能拿出来本来是只需要让这个线程等一秒就够了,但是制作URL实在是太快了,这个判断主要作为消费者合并小说的开始标志了
我们使用一共while循环,一直循环下载小说
这里我们从队列里面拿链接的时候要锁一下,一个线程拿链接的过程其他线程不能拿,防止多个线程拿同一个链接
取完链接就是线程自己的事了,互不干扰,就能解锁了
获取一下小说的章节,便于后面保存小说时使用
再接着就是使用xPath语法获取小说的内容,这时候的内容是一个列表,使用字符串拼接函数join把列表合并
发现每句话之间都有一个\u3000\u3000的字样,我们不希望看到这个东西,使用字符串替换函数replace把他们全换成\n换行符
这时候打印一下就会发现,小说的内容已经是分行显示了且中间没有空行,很完美
最后把小说保存在本地
消费者——合并小说
这个板块可有可无,我们在上面保存文件的时候完全可以把他们保存在一个文件里面,不过由于每个线程的差异,会导致某些章节的位置错乱,把这两个板块分离还是有点好处的
这个板块就不需要使用requests库了
完全就是文件操作,读取一个个单篇的小说,再按顺序保存到一个文件中去
这里的读取使用只读r的方式
保存文件使用追加的方式a,我们使用追加的方式打开文件时,按理来说,文件不存在应该会新建一个文件,但是程序报错了
这里我使用了先以只写的方式w创建文件,再关闭文件,再使用追加的方式打开文件,就解决的这个问题
在读取小说的时候,我们要按照顺序读取,所以还要使用os库判断一下小说的章节存不存在,如果存在就读取追加,然后num+1
这里的num就是我控制小说章节顺序的依据,我在前面设计保存小说的时候就是以章节数字命名,所以无论下载时间的前后都不影响按顺序读取小说
读取完的小说没有什么价值了,就把读取后的章节删除
毕竟看小说还是喜欢一直连续的看,不喜欢一直关闭打开文件吧
主函数——函数入口
没什么好说的
只有一个队列的实例化,还有多线程的启动
这里只有板块二的工作更能证明多线程的优点
还是希望读到这里的小伙伴可以跟我交流一下哪里有疑惑,大家需要互相成长,毕竟金无足赤,人无完人
完~~