gil锁
本文中的python都指cpython
gil全称为global interpreter lock,python中的一个线程分别对应c语言中的一个线程。GIL的存在使得同一时刻只能有一个线程在CPU上执行字节码,这就意味着不能像其他编程语言一样将多个线程映射到多个CPU上以实现真正的并发编程
所以GIL锁的存在使得python真正的并发编程成为一种奢望,但可以在一个CPU上快速的切换线程,已达到类并发的效果,以减少线程的等待时间。
GIL锁是可以释放的,满足以下任一条件都可以释放GIL锁
- GIL会在执行的时间片满了或者字节数满了之后释放GIL锁
- 在遇到IO操作时会释放锁
多线程演变的原因在于:进程对系统资源的消耗巨大,其次进程间相互隔离。但是对于文件IO操作,多线程和多进程性能差别不大,甚至多进程的速度会比多线程稍快
每个多线程启动时,都会存在一个主线程main thread
,默认主线程退出时,子线程会被kill掉,所以threading
模块中提供的join
函数,手动将子线程挂起,待子线程执行完之后再执行主线程
Thread提供了
1 | import threading |
如果加上join
函数
1 | ... |
线程间通信
线程间通信可通过以下两种方式
- 全局变量
- 队列通信
利用全局变量global
可方便实现多线程之间通信,但是由于GIL锁的存在,可能会造成由于线程的时间片或者字节数满了,而造成线程间切换,而带来数据丢失
1 | a = 0 |
当然,可以通过加锁的方式来保证线程的安全,随之带来的是线程性能的下降,所以可通过queue
的方式来进行线程间的通信
1 | def add(num, queue): |
不过需要注意的是,Queue的get和put方法中,还是使用的锁,都是使用的,通过查看put源码,可发现
1 | with self.not_full: |
当队列不为空,阻塞时,队列会等待放数据,然后通知下一个线程
与全局变量相比,队列安全,方便,开箱即用。当然,队列也存在性能方便的问题,加锁之后,性能下降是无法避免的。Queue可指定队列的maxsize
参数,因为maxsize过大会占用过多内存,所以使用时最好指定合适的大小。
线程同步
线程同步我理解的是用来保证线程安全的方式,一般来说线程同步有三种方式,分别如下
- Lock,加锁
- RLock,也是加锁,只是可重入的锁
- Condition,复杂线程同步
还是回到之前说的全局变量的那个例子中,为了更好的展示python中字节码的接在过程,使用如下例子演示
1 | def add(a): |
由于GIL的存在,这两个函数在使用多线程的时候,有可能会存在以下的情况:先加载add中的a,在加载sub中的a;加载add中的1,再加载sub中的1;add中+运算,sub中-运算;add中赋值操作,sub中赋值操作。由此,得出来最后的结果为-1,而不是0。
为了解决以上问题,先试试Lock,执行部分代码如下
1 | ... |
acquire
是为这个线程加上一把锁,release
是释放这个线程的锁,需要注意的是,当前的线程必须要释放掉当前的这把锁之后才能进行下一步,于是就带来了Lock最大的弊端,容易死锁
于是,由于Lock的存在,带来了RLock,可重入的锁,这可以连续多次调用acquire
,只要保证一个线程中的acquire和release个数是相同的,就不会造成死锁,这就为多线程之间相互调用提供了可能,比较方便的是,RLock和Lock的接口一致性,也是使用的acquire
和release
但是对于复杂场景下的线程同步,这两种锁显然满足不了需求,比如,多个线程之间的对话。为了解决这种机制问题,可使用Condition模块,condition会先执行该线程之后,会使用notify方法通知另外一个线程,并且使用wait方法等待另外一个线程的通知,这种情况下线程启动的先后顺序就显得尤为重要了。由此种机制可以理解,这其实也是用的锁,由Condition的源码也可以看见,__init__
构造函数中还是加了一把锁
1 | if lock is None: |
而由于condition中实现了__enter__
和__exit__
,而这两个方法底层调用的是RLock中的__enter__
和__exit__
,而RLock中的这两个方法是调用的acquire
和lock
方法,于是RLock中的加锁和取锁可以简化为
1 | with lock: |
1 | def say1(cond): |
值得注意的是,wait
方法调用时,会先释放底层锁,然后阻塞,直到另一个线程调用notify()或者notify_all()方法为止,或者timeout超时为止,当被唤醒或者超时,它将重新获得锁并返回
在web开发中,通常会使用限制同时爬取的线程个数来控制反爬,在多线程中,可通过Semaphore
方法来实现,例子如下
1 | class GetUrlList(threading.Thread): |
Semaphore
默认的线程个数为1个,可自行指定个数,需要注意的是,acquire和release方法中还是用的condition的锁
线程池
对于多线程的线程池,Semaphore其实可以作为是一个简易的线程池,但是遇到复杂场景下,它不能很好的处理,比如:主线程获取某一个线程或者任务的状态和返回值;当一个线程执行完之后能马上通知主线程并返回。
这时,就需要通过另外的方法了,由此引申出concurrent模块的futures
方法,其中的ThreadPoolExecutor
方法封装了对于多线程的各种复杂方法,且与多进程的接口一致性,用来保证可复用性。
1 | def sleep(sec): |
submit
方法会在线程执的时候立即返回,如上例,会先打印字符串,再等待几秒
done
方法会返回当前线程是否已经执行完成,完成返回True,否则为False
1 | task1 = executor.submit(sleep, 2) |
1 | task1 = executor.submit(sleep, 2) |
result
方法可以获取线程的返回值
1 | print(task1.result()) |
如果想要获取已经成功完成的返回结果,可以通过futures
模块下的as_completed
方法获取,如下:
1 | from concurrent.futures import ThreadPoolExecutor, as_completed |
它的特点是先完成先返回,比如上例中,先返回2秒的线程,最后返回4秒的线程
此外,还提供了一种更为简便的方法,如下
1 | for data in executor.map(sleep, secs): |
它与as_completed
方法存在些许不同,map方法的执行顺序是按照可迭代对象的顺序执行,as_completed执行顺序是按照线程完成的先后顺序执行,在使用的时候需注意这点。
多进程编程
由于GIL锁的存在,多线程不能有效的利用多核的CPU,从而没办法达到真正的并发操作,python中所谓的多线程其实是快速的切换同一个CPU而已,只是在多个线程可能存在的网络延迟,程序处理中,还是很有必要的。相比之下,多进程能实现真正的并发编程,能使用多个CPU同时工作,但是,进程间资源不共享,进程切换带来的巨大消耗,使得多进程编程无法成为大多数并发编程的主流。于是,综合以上,得出以下经验
- 对于消耗CPU资源大的操作,可尽量使用多进程编程,例如,算术操作
- 对于IO频繁的操作,尽量使用多线程编程
在多进程中,通常使用futures
模块中的ProcessPoolExecutor
方法,因为该方法具有与ThreadPoolExecutor
的多线程编程的一致接口,换个名字即可,在此不做过多示例,但是必须注意的是,在使用该方法时,必须将多进程相关操作放在if __name__ == '__main__':
之后(windows平台)。
以下多进程相关操作,都是用multiprocessing
模块演示
1 | def sleep(sec): |
操作与threading
模块中多线程的操作类似
可使用multiprocessing
模块下的Pool
方法来设定进程池,且进程池默认为当前CPU的个数
1 | pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
tips:必须先关闭pool,再挂起pool
如果想要获取进程执行结果,可使用get
方法
1 | print(result1.get()) |
进程池还提供了imap
方法,类似于executor.map()
的功能,执行的顺序与传递的可迭代对象的顺序相同
1 | pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
此外,还提供了map
方法,类似于as_completed
方法,执行的顺序按照时间先后顺序,在此不再赘述
多进程间通信
由于进程间资源是不共享的,所以多进程通信无法使用全局变量,可使用以下方式来进行通信
- 使用
multiprocessing
模块下的Queue
方法来进行通信,却无法使用之前使用的原生Queue
来进行通信,但是此方法无法在进程池中使用 - 进程池下的通信可使用
multiprocessing
模块下的Manager
方法来进行通信(Manager().Queue()
) - 使用
pipe
管道进行通信
1 | def add(queue): |
进程池通信
1 | queue = Manager().Queue(100) |
管道通信
1 | def add(pipe): |
管道通信只能应用于两个进程之间,并且它的性能要高于使用Queue,原因在于队列使用了很多锁,这会带来性能的下降
进程间内存共享
有些时候我们需要使用多个进程对同一个内存空间进行操作,比如,多个进程修改同一个字典。multiprocessing
模块中的Manager
方法提供了丰富的数据结构,以供内存共享来使用,比如常见的dict
,list
,array
以及Lock
,RLock
,Condition
等等。
1 | def add_list(p_list, new): |