多线程与多进程

gil锁

本文中的python都指cpython

gil全称为global interpreter lock,python中的一个线程分别对应c语言中的一个线程。GIL的存在使得同一时刻只能有一个线程在CPU上执行字节码,这就意味着不能像其他编程语言一样将多个线程映射到多个CPU上以实现真正的并发编程

所以GIL锁的存在使得python真正的并发编程成为一种奢望,但可以在一个CPU上快速的切换线程,已达到类并发的效果,以减少线程的等待时间。

GIL锁是可以释放的,满足以下任一条件都可以释放GIL锁

  • GIL会在执行的时间片满了或者字节数满了之后释放GIL锁
  • 在遇到IO操作时会释放锁

多线程演变的原因在于:进程对系统资源的消耗巨大,其次进程间相互隔离。但是对于文件IO操作,多线程和多进程性能差别不大,甚至多进程的速度会比多线程稍快

每个多线程启动时,都会存在一个主线程main thread,默认主线程退出时,子线程会被kill掉,所以threading模块中提供的join函数,手动将子线程挂起,待子线程执行完之后再执行主线程

Thread提供了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading
import time

def test1():
time.sleep(2)

def test2():
time.sleep(3)

if __name__=='__main__':
thread1 = threading.Thread(target=test1)
thread2 = threading.Thread(target=test2)
start = time.time()
thread1.start()
thread2.start()
print('total time spend: {}'.format(time.time() - start))
## output:total time spend: 0.0

如果加上join函数

1
2
3
4
5
6
7
...
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print('total time spend: {}'.format(time.time() - start))
## output:total time spend: 3.00200009346

线程间通信

线程间通信可通过以下两种方式

  1. 全局变量
  2. 队列通信

利用全局变量global可方便实现多线程之间通信,但是由于GIL锁的存在,可能会造成由于线程的时间片或者字节数满了,而造成线程间切换,而带来数据丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
a = 0
def add(num):
global a
for i in range(num):
a += i
def sub(num):
global a
for i in range(num):
a -= i
thread1 = threading.Thread(target=add, args=(1000000, ))
thread2 = threading.Thread(target=sub, args=(1000000, ))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(a)
# output: -262517192310

当然,可以通过加锁的方式来保证线程的安全,随之带来的是线程性能的下降,所以可通过queue的方式来进行线程间的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add(num, queue):
for i in range(num):
queue.put(num)

def sub(num, queue):
for i in range(num):
queue.get(num)

queue = Queue(maxsize=100)
thread1 = threading.Thread(target=add, args=(1000000, queue))
thread2 = threading.Thread(target=sub, args=(1000000, queue))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(queue.qsize())
# output: 0

不过需要注意的是,Queue的get和put方法中,还是使用的锁,都是使用的,通过查看put源码,可发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()

当队列不为空,阻塞时,队列会等待放数据,然后通知下一个线程

与全局变量相比,队列安全,方便,开箱即用。当然,队列也存在性能方便的问题,加锁之后,性能下降是无法避免的。Queue可指定队列的maxsize参数,因为maxsize过大会占用过多内存,所以使用时最好指定合适的大小。

线程同步

线程同步我理解的是用来保证线程安全的方式,一般来说线程同步有三种方式,分别如下

  1. Lock,加锁
  2. RLock,也是加锁,只是可重入的锁
  3. Condition,复杂线程同步

还是回到之前说的全局变量的那个例子中,为了更好的展示python中字节码的接在过程,使用如下例子演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def add(a):
a +=1

def sub(a):
a -= 1

print(dis.dis(add))
print(dis.dis(sub))
# 7 0 LOAD_FAST 0 (a)
# 2 LOAD_CONST 1 (1)
# 4 INPLACE_ADD
# 6 STORE_FAST 0 (a)
# 8 LOAD_CONST 0 (None)
# 10 RETURN_VALUE
#None
# 10 0 LOAD_FAST 0 (a)
# 2 LOAD_CONST 1 (1)
# 4 INPLACE_SUBTRACT
# 6 STORE_FAST 0 (a)
# 8 LOAD_CONST 0 (None)
# 10 RETURN_VALUE
#None

由于GIL的存在,这两个函数在使用多线程的时候,有可能会存在以下的情况:先加载add中的a,在加载sub中的a;加载add中的1,再加载sub中的1;add中+运算,sub中-运算;add中赋值操作,sub中赋值操作。由此,得出来最后的结果为-1,而不是0。

为了解决以上问题,先试试Lock,执行部分代码如下

1
2
3
4
5
...
lock.acquire()
a += 1
lock.release()
...

acquire是为这个线程加上一把锁,release是释放这个线程的锁,需要注意的是,当前的线程必须要释放掉当前的这把锁之后才能进行下一步,于是就带来了Lock最大的弊端,容易死锁

于是,由于Lock的存在,带来了RLock,可重入的锁,这可以连续多次调用acquire,只要保证一个线程中的acquire和release个数是相同的,就不会造成死锁,这就为多线程之间相互调用提供了可能,比较方便的是,RLock和Lock的接口一致性,也是使用的acquirerelease

但是对于复杂场景下的线程同步,这两种锁显然满足不了需求,比如,多个线程之间的对话。为了解决这种机制问题,可使用Condition模块,condition会先执行该线程之后,会使用notify方法通知另外一个线程,并且使用wait方法等待另外一个线程的通知,这种情况下线程启动的先后顺序就显得尤为重要了。由此种机制可以理解,这其实也是用的锁,由Condition的源码也可以看见,__init__构造函数中还是加了一把锁

1
2
3
if lock is None:
lock = RLock()
self._lock = lock

而由于condition中实现了__enter____exit__,而这两个方法底层调用的是RLock中的__enter____exit__,而RLock中的这两个方法是调用的acquirelock方法,于是RLock中的加锁和取锁可以简化为

1
2
with lock:
a -= 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def say1(cond):
with cond:
cond.wait()
print('say 1')
cond.notify()

cond.wait()
print('say 2')
cond.notify()


def say3(cond):
with cond:
print('say 3')
cond.notify()
cond.wait()

print('say 4')
cond.notify()
cond.wait()
cond = threading.Condition()
threading.Condition()
thread1 = threading.Thread(target=say1, args=(cond, ))
thread2 = threading.Thread(target=say3, args=(cond, ))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# say 3
# say 1
# say 4
# say 2

值得注意的是,wait方法调用时,会先释放底层锁,然后阻塞,直到另一个线程调用notify()或者notify_all()方法为止,或者timeout超时为止,当被唤醒或者超时,它将重新获得锁并返回

在web开发中,通常会使用限制同时爬取的线程个数来控制反爬,在多线程中,可通过Semaphore方法来实现,例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class GetUrlList(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem

def run(self):
for i in range(20):
self.sem.acquire()
detail_html = ParseDetail('https://www.baidu.com/{}'.format(i), self.sem)
detail_html.start()

class ParseDetail(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.sem = sem
self.url = url

def run(self):
time.sleep(2)
print('parse html success')
self.sem.release()
if __name__ == '__main__':
sem = threading.Semaphore(3)
url_list = GetUrlList(sem)
url_list.start()

Semaphore默认的线程个数为1个,可自行指定个数,需要注意的是,acquire和release方法中还是用的condition的锁

线程池

对于多线程的线程池,Semaphore其实可以作为是一个简易的线程池,但是遇到复杂场景下,它不能很好的处理,比如:主线程获取某一个线程或者任务的状态和返回值;当一个线程执行完之后能马上通知主线程并返回。

这时,就需要通过另外的方法了,由此引申出concurrent模块的futures方法,其中的ThreadPoolExecutor方法封装了对于多线程的各种复杂方法,且与多进程的接口一致性,用来保证可复用性。

1
2
3
4
5
6
7
8
def sleep(sec):
print('i sleep {} sec'.format(sec))
time.sleep(sec)
return sec

executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(sleep, 2)
task2 = executor.submit(sleep, 3)

submit方法会在线程执的时候立即返回,如上例,会先打印字符串,再等待几秒

done方法会返回当前线程是否已经执行完成,完成返回True,否则为False

1
2
3
4
task1 = executor.submit(sleep, 2)
task2 = executor.submit(sleep, 3)
print(task1.done())
#output: False
1
2
3
4
5
task1 = executor.submit(sleep, 2)
task2 = executor.submit(sleep, 3)
time.sleep(2)
print(task1.done())
#output: True

result方法可以获取线程的返回值

1
print(task1.result())

如果想要获取已经成功完成的返回结果,可以通过futures模块下的as_completed方法获取,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from concurrent.futures import ThreadPoolExecutor, as_completed
...
executor = ThreadPoolExecutor(max_workers=2)
secs = [3, 2, 4]
tasks = [executor.submit(sleep, sec) for sec in secs]
for future in as_completed(tasks):
data = future.result()
print('this is {} sec'.format(data))
# i sleep 3 sec
# i sleep 2 sec
# i sleep 4 sec
# this is 2 sec
# this is 3 sec
# this is 4 sec

它的特点是先完成先返回,比如上例中,先返回2秒的线程,最后返回4秒的线程

此外,还提供了一种更为简便的方法,如下

1
2
3
4
5
6
7
8
for data in executor.map(sleep, secs):
print('this is {} sec'.format(data))
#i sleep 3 sec
#i sleep 2 sec
#i sleep 4 sec
#this is 3 sec
#this is 2 sec
#this is 4 sec

它与as_completed方法存在些许不同,map方法的执行顺序是按照可迭代对象的顺序执行,as_completed执行顺序是按照线程完成的先后顺序执行,在使用的时候需注意这点。

多进程编程

由于GIL锁的存在,多线程不能有效的利用多核的CPU,从而没办法达到真正的并发操作,python中所谓的多线程其实是快速的切换同一个CPU而已,只是在多个线程可能存在的网络延迟,程序处理中,还是很有必要的。相比之下,多进程能实现真正的并发编程,能使用多个CPU同时工作,但是,进程间资源不共享,进程切换带来的巨大消耗,使得多进程编程无法成为大多数并发编程的主流。于是,综合以上,得出以下经验

  1. 对于消耗CPU资源大的操作,可尽量使用多进程编程,例如,算术操作
  2. 对于IO频繁的操作,尽量使用多线程编程

在多进程中,通常使用futures模块中的ProcessPoolExecutor方法,因为该方法具有与ThreadPoolExecutor的多线程编程的一致接口,换个名字即可,在此不做过多示例,但是必须注意的是,在使用该方法时,必须将多进程相关操作放在if __name__ == '__main__':之后(windows平台)。

以下多进程相关操作,都是用multiprocessing模块演示

1
2
3
4
5
6
7
8
9
10
11
12
def sleep(sec):
time.sleep(sec)
print('i sleep {} sec'.format(sec))
return sec

if __name__ == '__main__':
p1 = multiprocessing.Process(target=sleep, args=(2, ))
p2 = multiprocessing.Process(target=sleep, args=(3, ))
p1.start()
p2.start()
p1.join()
p2.join()

操作与threading模块中多线程的操作类似

可使用multiprocessing模块下的Pool方法来设定进程池,且进程池默认为当前CPU的个数

1
2
3
4
5
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result1 = pool.apply_async(sleep, args=(2, ))
result2 = pool.apply_async(sleep, args=(3, ))
pool.close()
pool.join()

tips:必须先关闭pool,再挂起pool

如果想要获取进程执行结果,可使用get方法

1
print(result1.get())

进程池还提供了imap方法,类似于executor.map()的功能,执行的顺序与传递的可迭代对象的顺序相同

1
2
3
4
5
6
7
8
9
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for data in pool.imap(sleep, [3, 2, 4]):
print(data)
#i sleep 2 sec
#i sleep 3 sec
#3
#2
#i sleep 4 sec
#4

此外,还提供了map方法,类似于as_completed方法,执行的顺序按照时间先后顺序,在此不再赘述

多进程间通信

由于进程间资源是不共享的,所以多进程通信无法使用全局变量,可使用以下方式来进行通信

  • 使用multiprocessing模块下的Queue方法来进行通信,却无法使用之前使用的原生Queue来进行通信,但是此方法无法在进程池中使用
  • 进程池下的通信可使用multiprocessing模块下的Manager方法来进行通信(Manager().Queue())
  • 使用pipe管道进行通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def add(queue):
queue.put('a')

def sub(queue):
data = queue.get('a')
print(data)

if __name__ == '__main__':
queue = Queue(100)
p1 = multiprocessing.Process(target=add, args=(queue, ))
p2 = multiprocessing.Process(target=sub, args=(queue, ))
p1.start()
p2.start()
p1.join()
p2.join()
#output: a

进程池通信

1
2
3
4
5
6
7
queue = Manager().Queue(100)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.apply_async(add, args=(queue, ))
pool.apply_async(sub, args=(queue, ))
pool.close()
pool.join()
#output: a

管道通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def add(pipe):
pipe.send('jack')

def sub(pipe):
data = pipe.recv()
print(data)

if __name__ == '__main__':
recevie_pipe, send_pipe = Pipe()
p1 = multiprocessing.Process(target=add, args=(send_pipe, ))
p2 = multiprocessing.Process(target=sub, args=(recevie_pipe, ))
p1.start()
p2.start()
p1.join()
p2.join()
#output: jack

管道通信只能应用于两个进程之间,并且它的性能要高于使用Queue,原因在于队列使用了很多锁,这会带来性能的下降

进程间内存共享

有些时候我们需要使用多个进程对同一个内存空间进行操作,比如,多个进程修改同一个字典。multiprocessing模块中的Manager方法提供了丰富的数据结构,以供内存共享来使用,比如常见的dict,list,array以及Lock,RLock,Condition等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
def add_list(p_list, new):
p_list.append(new)

if __name__ == '__main__':
process_list = Manager().list()
p1 = multiprocessing.Process(target=add_list, args=(process_list, 'tang', ))
p2 = multiprocessing.Process(target=add_list, args=(process_list, 'jack', ))
p1.start()
p2.start()
p1.join()
p2.join()
print(process_list)
#output: ['tang', 'jack']
-------------本文结束感谢您的阅读-------------