线程安全和锁
一、全局解释器锁
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
GIL全称global interpreter lock,全局解释器锁。
每个线程在执行的时候都需要先获取GIL,保证同一时刻只有一个线程可以执行代码,即同一时刻只有一个线程使用CPU。在CPython中,每一个Python线程执行前都需要去获得GIL锁 ,获得该锁的线程才可以执行,没有获得的只能等待 ,当具有GIL锁的线程运行完成后,其他等待的线程就会去争夺GIL锁,这就造成了,在Python中使用多线程,但同一时刻下依旧只有一个线程在运行 ,所以Python多线程其实并不是「并行」的,而是「并发」 。
看到下图,图中是Python中GIL的工作实例,其中有3个线程,线程与线程之间是顺序执行的 ,每个线程开始执行时都会去获得GIL,防止其他线程线程运行 ,每执行完一段时间后,就会释放GIL,让别的线程可以去争夺执行权限,如果自己本身也没有执行完,则本身也会参与这次争夺 。
# 多线程的代码
import threading, timedef add(n):sum = 0while sum < n:sum += 1print(f'sum:{sum}')if __name__ == '__main__':start = time.time()n = 500000000t1 = threading.Thread(target=add, args=[n // 2])t2 = threading.Thread(target=add, args=[n // 2])t1.start()t2.start()t1.join()t2.join()print('run time: %s' % str(time.time() - start))
# 单线程的代码
import timedef add(n):sum = 0while sum < n:sum += 1print(f'sum:{sum}')if __name__ == '__main__':start = time.time()add(500000000)print('run time: %s' % str(time.time() - start))
总结
-
GIL解决方法:
-
使用其他语言写的python解释器(不推荐,还是用官方CPython好)
eg:Jython(java);IronPython(.net);pypy(Python) -
不使用多线程,使用多进程-进程里加协程实现多任务来充分利用多核CPU (推荐)
-
即使存在GIL 在有IO等待操作的程序中,还是多线程快,当然没有资源等待的还是单线程快(科学计算,累加等等)
但需要注意的是线程有了GIL后并不意味着使用Python多线程时不需要考虑线程安全 ,「GIL的存在是为了方便使用C语言编写CPython解释器的编写者,而顶层使用Python时依旧要考虑线程安全」 。
二、线程安全
当多个线程同时访问一个对象时,不管如何计算,如果调用这个对象的行为都可以获得正确的结果,那就称这个对象时线程安全的。 如果出现了“脏数据”。则线程不安全。
脏数据 :产生脏数据的原因是,当一个线程在对数据进行修改时,修改到一半时另一个线程读取了未经修改的数据并进行修改。如何避免脏数据的产生呢?一个办法就是用join方法,即先让一个线程执行完毕再执行另一个线程。但这样的本质是把多线程变成了单线程,失去了多线程的意义。另一个办法就是用线程锁。
import threadingg_number = 0def hello():global g_numberfor i in range(1000000): # 加的次数越大越容易出现资源竞争问题g_number += 1print(f'thd1运行的结果为:{g_number}')def world():global g_numberfor i in range(1000000):g_number += 1print(f'thd2运行的结果为:{g_number}')if __name__ == '__main__':thd1 = threading.Thread(target=hello)thd2 = threading.Thread(target=world)thd1.start()thd2.start()# 阻塞等待thd1.join()thd2.join()print(g_number) # 结果随机 可能小于等于2000000
三、锁
锁是Python提供给我们能够自行操控线程切换的一种手段,使用锁可以让线程的切换变的有序。
一旦线程的切换变的有序后,各个线程之间对数据的访问、修改就变的可控,所以若要保证线程安全,就必须使用锁。
threading模块中提供了5种最常见的锁,下面是按照功能进行划分:
- 同步锁:lock(一次只能放行一个)
- 递归锁:rlock(一次只能放行一个)
- 条件锁:condition(一次可以放行任意个)
- 事件锁:event(一次全部放行)
- 信号量锁:semaphore(一次可以放行特定个)
1、同步锁
同一时刻的一个进程下的一个线程只能使用一个cpu,要确保这个线程下的程序在一段时间内被cpu执,那么就要用到同步锁。只需要在对公共数据的操作前后加上上锁和释放锁的操作即可。
死锁: 指两个或两个以上的线程或进程在执行程序的过程中,因争夺资源而相互等待的一个现象。
import threadingg_number = 0
lock = threading.Lock()def hello():global g_numberfor i in range(1000000): # 加的次数越大越容易出现资源竞争问题with lock:g_number += 1print(f'thd1运行的结果为:{g_number}')def world():global g_numberfor i in range(1000000):with lock:g_number += 1print(f'thd2运行的结果为:{g_number}')if __name__ == '__main__':thd1 = threading.Thread(target=hello)thd2 = threading.Thread(target=world)thd1.start()thd2.start()# 阻塞等待thd1.join()thd2.join()print(g_number) # 结果随机 可能小于等于2000000
2、递归同步锁
在同步锁的基础上可以做到连续重复使用多次acquire()后再重复使用多次release()的操作,但是一定要注意加锁次数和解锁次数必须一致,否则也将引发死锁现象。
递归锁RLock:它内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
3、条件锁
条件锁是在递归锁的基础上增加了能够暂停线程运行的功能。并且我们可以使用wait()与notify()来控制线程执行的个数。
注意:条件锁可以自由设定一次放行几个线程。
import threadingcurrentRunThreadNumber = 0
maxSubThreadNumber = 10def task():global currentRunThreadNumberthread_name = threading.currentThread().namewith condLock:print("线程开始启动,并马上进入等待状态 : %s" % thread_name)condLock.wait() # 暂停线程运行、等待唤醒print("线程唤醒了,开始运行后面的代码 : %s" % thread_name)currentRunThreadNumber += 1if __name__ == "__main__":condLock = threading.Condition()for i in range(maxSubThreadNumber):subThreadIns = threading.Thread(target=task)subThreadIns.start()while currentRunThreadNumber < maxSubThreadNumber:notifyNumber = int(input("请输入要唤醒几个线程:"))with condLock:condLock.notify(notifyNumber) # 放行print("main thread run end")
4、事件锁
事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子线程继续运行。
我们可以将事件锁看为红绿灯,当红灯时所有子线程都暂停运行,并进入“等待”状态,当绿灯时所有子线程都恢复“运行”。
import threadingmaxSubThreadNumber = 3def task():thread_name = threading.currentThread().nameprint("线程开始启动,并马上进入等待状态 : %s" % thread_name)eventLock.wait() # 暂停运行,等待绿灯print("第一次绿灯打开,线程往下走:%s" % thread_name)eventLock.wait() # 暂停运行,等待绿灯print("第二次绿灯打开,线程往下走:%s" % thread_name)if __name__ == "__main__":eventLock = threading.Event()for i in range(maxSubThreadNumber):subThreadIns = threading.Thread(target=task)subThreadIns.start()eventLock.set() # 设置为绿灯eventLock.clear() # 设置为红灯eventLock.set()
5、信号量锁
Semaphore()
信号量锁也是根据条件锁来做的,它与条件锁和事件锁的区别如下:
- 条件锁:一次可以放行任意个处于“等待”状态的线程
- 事件锁:一次可以放行全部的处于“等待”状态的线程
- 信号量锁:通过规定,成批的放行特定(指定)个处于“上锁”状态的线程
import threading
import timemaxSubThreadNumber = 6def task():thread_name = threading.currentThread().namewith semaLock:print("线程获得锁,开始运行: %s" % thread_name)time.sleep(3)if __name__ == "__main__":semaLock = threading.Semaphore(2)for i in range(maxSubThreadNumber):subThreadIns = threading.Thread(target=task)subThreadIns.start()